This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 5da97515c feat(redis): delay according to the cmd & key (#6999)
5da97515c is described below

commit 5da97515cff8da8fa0f1c8ab9ee55662bd871f13
Author: 罗泽轩 <[email protected]>
AuthorDate: Tue May 10 09:44:21 2022 +0800

    feat(redis): delay according to the cmd & key (#6999)
    
    Signed-off-by: spacewander <[email protected]>
---
 .ignore_words                                   |   1 +
 apisix/stream/xrpc/protocols/redis/commands.lua | 222 +++++++++++++++++
 apisix/stream/xrpc/protocols/redis/init.lua     | 113 ++++++++-
 apisix/stream/xrpc/protocols/redis/schema.lua   |  25 ++
 apisix/stream/xrpc/runner.lua                   |   5 +-
 apisix/stream/xrpc/sdk.lua                      |   4 +-
 t/xrpc/redis.t                                  | 308 ++++++++++++++++++++++++
 7 files changed, 669 insertions(+), 9 deletions(-)

diff --git a/.ignore_words b/.ignore_words
index 33b02228e..5e9354537 100644
--- a/.ignore_words
+++ b/.ignore_words
@@ -6,3 +6,4 @@ shttp
 nd
 hel
 nulll
+smove
diff --git a/apisix/stream/xrpc/protocols/redis/commands.lua 
b/apisix/stream/xrpc/protocols/redis/commands.lua
new file mode 100644
index 000000000..ff3338fde
--- /dev/null
+++ b/apisix/stream/xrpc/protocols/redis/commands.lua
@@ -0,0 +1,222 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ipairs = ipairs
+local pairs = pairs
+
+
+local cmd_to_key_finder = {}
+--[[
+-- the data is generated from the script below
+local redis = require "resty.redis"
+local red = redis:new()
+
+local ok, err = red:connect("127.0.0.1", 6379)
+if not ok then
+    ngx.say("failed to connect: ", err)
+    return
+end
+
+local res = red:command("info")
+local map = {}
+for _, r in ipairs(res) do
+    local first_key = r[4]
+    local last_key = r[5]
+    local step = r[6]
+    local idx = first_key .. ':' .. last_key .. ':' .. step
+
+    if idx ~= "1:1:1" then
+        -- "1:1:1" is the default
+        if map[idx] then
+            table.insert(map[idx], r[1])
+        else
+            map[idx] = {r[1]}
+        end
+    end
+end
+for _, r in pairs(map) do
+    table.sort(r)
+end
+local dump = require('pl.pretty').dump; dump(map)
+--]]
+local key_to_cmd = {
+    ["0:0:0"] = {
+        "acl",
+        "asking",
+        "auth",
+        "bgrewriteaof",
+        "bgsave",
+        "blmpop",
+        "bzmpop",
+        "client",
+        "cluster",
+        "command",
+        "config",
+        "dbsize",
+        "debug",
+        "discard",
+        "echo",
+        "eval",
+        "eval_ro",
+        "evalsha",
+        "evalsha_ro",
+        "exec",
+        "failover",
+        "fcall",
+        "fcall_ro",
+        "flushall",
+        "flushdb",
+        "function",
+        "hello",
+        "info",
+        "keys",
+        "lastsave",
+        "latency",
+        "lmpop",
+        "lolwut",
+        "memory",
+        "module",
+        "monitor",
+        "multi",
+        "object",
+        "pfselftest",
+        "ping",
+        "psubscribe",
+        "psync",
+        "publish",
+        "pubsub",
+        "punsubscribe",
+        "quit",
+        "randomkey",
+        "readonly",
+        "readwrite",
+        "replconf",
+        "replicaof",
+        "reset",
+        "role",
+        "save",
+        "scan",
+        "script",
+        "select",
+        "shutdown",
+        "sintercard",
+        "slaveof",
+        "slowlog",
+        "subscribe",
+        "swapdb",
+        "sync",
+        "time",
+        "unsubscribe",
+        "unwatch",
+        "wait",
+        "xgroup",
+        "xinfo",
+        "xread",
+        "xreadgroup",
+        "zdiff",
+        "zinter",
+        "zintercard",
+        "zmpop",
+        "zunion"
+    },
+    ["1:-1:1"] = {
+        "del",
+        "exists",
+        "mget",
+        "pfcount",
+        "pfmerge",
+        "sdiff",
+        "sdiffstore",
+        "sinter",
+        "sinterstore",
+        "ssubscribe",
+        "sunion",
+        "sunionstore",
+        "sunsubscribe",
+        "touch",
+        "unlink",
+        "watch"
+    },
+    ["1:-1:2"] = {
+        "mset",
+        "msetnx"
+    },
+    ["1:-2:1"] = {
+        "blpop",
+        "brpop",
+        "bzpopmax",
+        "bzpopmin"
+    },
+    ["1:2:1"] = {
+        "blmove",
+        "brpoplpush",
+        "copy",
+        "geosearchstore",
+        "lcs",
+        "lmove",
+        "rename",
+        "renamenx",
+        "rpoplpush",
+        "smove",
+        "zrangestore"
+    },
+    ["2:-1:1"] = {
+        "bitop"
+    },
+    ["2:2:1"] = {
+        "pfdebug"
+    },
+    ["3:3:1"] = {
+        "migrate"
+    }
+}
+local key_finders = {
+    ["0:0:0"] = false,
+    ["1:-1:1"] = function (idx, narg)
+        return 1 < idx
+    end,
+    ["1:-1:2"] = function (idx, narg)
+        return 1 < idx and idx % 2 == 0
+    end,
+    ["1:-2:1"] = function (idx, narg)
+        return 1 < idx and idx < narg - 1
+    end,
+    ["1:2:1"] = function (idx, narg)
+        return idx == 2 or idx == 3
+    end,
+    ["2:-1:1"] = function (idx, narg)
+        return 2 < idx
+    end,
+    ["2:2:1"] = function (idx, narg)
+        return idx == 3
+    end,
+    ["3:3:1"] = function (idx, narg)
+        return idx == 4
+    end
+}
+for k, cmds in pairs(key_to_cmd) do
+    for _, cmd in ipairs(cmds) do
+        cmd_to_key_finder[cmd] = key_finders[k]
+    end
+end
+
+
+return {
+    cmd_to_key_finder = cmd_to_key_finder,
+    default_key_finder = function (idx, narg)
+        return idx == 2
+    end,
+}
diff --git a/apisix/stream/xrpc/protocols/redis/init.lua 
b/apisix/stream/xrpc/protocols/redis/init.lua
index ac31a4521..2635cca71 100644
--- a/apisix/stream/xrpc/protocols/redis/init.lua
+++ b/apisix/stream/xrpc/protocols/redis/init.lua
@@ -16,6 +16,7 @@
 --
 local core = require("apisix.core")
 local sdk = require("apisix.stream.xrpc.sdk")
+local commands = require("apisix.stream.xrpc.protocols.redis.commands")
 local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
 local ffi = require("ffi")
 local ffi_str = ffi.string
@@ -23,8 +24,10 @@ local math_random = math.random
 local OK = ngx.OK
 local DECLINED = ngx.DECLINED
 local DONE = ngx.DONE
+local sleep = ngx.sleep
 local str_byte = string.byte
 local str_fmt = string.format
+local ipairs = ipairs
 local tonumber = tonumber
 
 
@@ -32,6 +35,7 @@ local tonumber = tonumber
 -- There is no plan to support inline command format
 local _M = {}
 local MAX_LINE_LEN = 128
+local MAX_VALUE_LEN = 128
 local PREFIX_ARR = str_byte("*")
 local PREFIX_STR = str_byte("$")
 local PREFIX_STA = str_byte("+")
@@ -39,7 +43,62 @@ local PREFIX_INT = str_byte(":")
 local PREFIX_ERR = str_byte("-")
 
 
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
+
+local function create_matcher(conf)
+    local matcher = {}
+    --[[
+        {"delay": 5, "key":"x", "commands":["GET", "MGET"]}
+        {"delay": 5, "commands":["GET"]}
+        => {
+            get = {keys = {x = {delay = 5}, * = {delay = 5}}}
+            mget = {keys = {x = {delay = 5}}}
+        }
+    ]]--
+    for _, rule in ipairs(conf.faults) do
+        for _, cmd in ipairs(rule.commands) do
+            cmd = cmd:lower()
+            local key = rule.key
+            local kf = commands.cmd_to_key_finder[cmd]
+            local key_matcher = matcher[cmd]
+            if not key_matcher then
+                key_matcher = {
+                    keys = {}
+                }
+                matcher[cmd] = key_matcher
+            end
+
+            if not key or kf == false then
+                key = "*"
+            end
+
+            if key_matcher.keys[key] then
+                core.log.warn("override existent fault rule of cmd: ", cmd, ", 
key: ", key)
+            end
+
+            key_matcher.keys[key] = rule
+        end
+    end
+
+    return matcher
+end
+
+
+local function get_matcher(conf, ctx)
+    return core.lrucache.plugin_ctx(lrucache, ctx, nil, create_matcher, conf)
+end
+
+
 function _M.init_downstream(session)
+    local conf = session.route.protocol.conf
+    if conf and conf.faults then
+        local matcher = get_matcher(conf, session.conn_ctx)
+        session.matcher = matcher
+    end
+
     session.req_id_seq = 0
     session.resp_id_seq = 0
     return xrpc_socket.downstream.socket()
@@ -83,26 +142,53 @@ local function read_req(session, sk)
 
     local cmd_line = core.tablepool.fetch("xrpc_redis_cmd_line", narg, 0)
 
-    for i = 1, narg do
+    local n, err = read_len(sk)
+    if not n then
+        return nil, err
+    end
+
+    local p, err = sk:read(n + 2)
+    if not p then
+        return nil, err
+    end
+
+    local s = ffi_str(p, n)
+    cmd_line[1] = s
+
+    local key_finder
+    local matcher = session.matcher
+    if matcher then
+        matcher = matcher[s:lower()]
+        if matcher then
+            key_finder = commands.cmd_to_key_finder[s] or 
commands.default_key_finder
+        end
+    end
+
+    for i = 2, narg do
+        local is_key = false
+        if key_finder then
+            is_key = key_finder(i, narg)
+        end
+
         local n, err = read_len(sk)
         if not n then
             return nil, err
         end
 
         local s
-        if n > 1024 then
+        if not is_key and n > MAX_VALUE_LEN then
             -- avoid recording big value
-            local p, err = sk:read(1024)
+            local p, err = sk:read(MAX_VALUE_LEN)
             if not p then
                 return nil, err
             end
 
-            local ok, err = sk:drain(n - 1024 + 2)
+            local ok, err = sk:drain(n - MAX_VALUE_LEN + 2)
             if not ok then
                 return nil, err
             end
 
-            s = ffi_str(p, 1024) .. "..."
+            s = ffi_str(p, MAX_VALUE_LEN) .. "...(" .. n .. " bytes)"
         else
             local p, err = sk:read(n + 2)
             if not p then
@@ -110,6 +196,11 @@ local function read_req(session, sk)
             end
 
             s = ffi_str(p, n)
+
+            if is_key and matcher.keys[s] then
+                matcher = matcher.keys[s]
+                key_finder = nil
+            end
         end
 
         cmd_line[i] = s
@@ -121,6 +212,18 @@ local function read_req(session, sk)
     ctx.cmd = ctx.cmd_line[1]
 
     local pipelined = sk:has_pending_data()
+
+    if matcher then
+        if matcher.keys then
+            -- try to match any key of this command
+            matcher = matcher.keys["*"]
+        end
+
+        if matcher then
+            sleep(matcher.delay)
+        end
+    end
+
     return true, nil, pipelined
 end
 
diff --git a/apisix/stream/xrpc/protocols/redis/schema.lua 
b/apisix/stream/xrpc/protocols/redis/schema.lua
index 49a7a0e48..0b6c90c65 100644
--- a/apisix/stream/xrpc/protocols/redis/schema.lua
+++ b/apisix/stream/xrpc/protocols/redis/schema.lua
@@ -20,6 +20,31 @@ local core = require("apisix.core")
 local schema = {
     type = "object",
     properties = {
+        faults = {
+            type = "array",
+            minItems = 1,
+            items = {
+                type = "object",
+                properties = {
+                    commands = {
+                        type = "array",
+                        minItems = 1,
+                        items = {
+                            type = "string"
+                        },
+                    },
+                    key = {
+                        type = "string",
+                        minLength = 1,
+                    },
+                    delay = {
+                        type = "number",
+                        description = "additional delay in seconds",
+                    }
+                },
+                required = {"commands", "delay"}
+            },
+        },
     },
 }
 
diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua
index 8de3176ff..ea807f60a 100644
--- a/apisix/stream/xrpc/runner.lua
+++ b/apisix/stream/xrpc/runner.lua
@@ -28,9 +28,10 @@ local _M = {}
 
 local function open_session(conn_ctx)
     conn_ctx.xrpc_session = {
-        _upstream_conf = conn_ctx.matched_upstream,
+        conn_ctx = conn_ctx,
+        route = conn_ctx.matched_route.value,
         -- fields start with '_' should not be accessed by the protocol 
implementation
-        _route = conn_ctx.matched_route.value,
+        _upstream_conf = conn_ctx.matched_upstream,
         _ctxs = {},
     }
     return conn_ctx.xrpc_session
diff --git a/apisix/stream/xrpc/sdk.lua b/apisix/stream/xrpc/sdk.lua
index 9773cb36f..3e3f4557a 100644
--- a/apisix/stream/xrpc/sdk.lua
+++ b/apisix/stream/xrpc/sdk.lua
@@ -125,8 +125,8 @@ end
 -- @treturn table the new router under the specific protocol
 -- @treturn string the new router version
 function _M.get_router(session, version)
-    local protocol_name = session._route.protocol.name
-    local id = session._route.id
+    local protocol_name = session.route.protocol.name
+    local id = session.route.id
 
     local items, conf_version = router.routes()
     if version == conf_version then
diff --git a/t/xrpc/redis.t b/t/xrpc/redis.t
index 52a5fa0f5..a3a9ec9ca 100644
--- a/t/xrpc/redis.t
+++ b/t/xrpc/redis.t
@@ -257,3 +257,311 @@ hget animals: bark
     }
 --- response_body
 --- stream_conf_enable
+
+
+
+=== TEST 6: delay
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local etcd = require("apisix.core.etcd")
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "redis",
+                        conf = {
+                            faults = {
+                                {delay = 0.01, key = "ignored", commands = 
{"Ping", "time"}}
+                            }
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:6379"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 7: hit
+--- config
+    location /t {
+        content_by_lua_block {
+            local redis = require "resty.redis"
+            local red = redis:new()
+
+            local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+            if not ok then
+                ngx.say("failed to connect: ", err)
+                return
+            end
+
+            local start = ngx.now()
+            local res, err = red:ping()
+            if not res then
+                ngx.say(err)
+                return
+            end
+            local now = ngx.now()
+            -- use integer to bypass float point number precision problem
+            if math.ceil((now - start) * 1000) < 10 then
+                ngx.say(now, " ", start)
+                return
+            end
+            start = now
+
+            local res, err = red:time()
+            if not res then
+                ngx.say(err)
+                return
+            end
+            local now = ngx.now()
+            if math.ceil((now - start) * 1000) < 10 then
+                ngx.say(now, " ", start)
+                return
+            end
+            start = now
+
+            red:init_pipeline()
+            red:time()
+            red:time()
+            red:get("A")
+
+            local results, err = red:commit_pipeline()
+            if not results then
+                ngx.say("failed to commit: ", err)
+                return
+            end
+            local now = ngx.now()
+            if math.ceil((now - start) * 1000) < 20 or math.ceil((now - start) 
* 1000) > 30 then
+                ngx.say(now, " ", start)
+                return
+            end
+
+            ngx.say("ok")
+        }
+    }
+--- response_body
+ok
+--- stream_conf_enable
+
+
+
+=== TEST 8: DFS match
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local etcd = require("apisix.core.etcd")
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "redis",
+                        conf = {
+                            faults = {
+                                {delay = 0.02, key = "a", commands = {"get"}},
+                                {delay = 0.01, commands = {"get", "set"}},
+                            }
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:6379"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 9: hit
+--- config
+    location /t {
+        content_by_lua_block {
+            local redis = require "resty.redis"
+            local red = redis:new()
+
+            local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+            if not ok then
+                ngx.say("failed to connect: ", err)
+                return
+            end
+
+            local start = ngx.now()
+            local res, err = red:get("a")
+            if not res then
+                ngx.say(err)
+                return
+            end
+            local now = ngx.now()
+            if math.ceil((now - start) * 1000) < 20 then
+                ngx.say(now, " ", start)
+                return
+            end
+            start = now
+
+            local res, err = red:set("a", "a")
+            if not res then
+                ngx.say(err)
+                return
+            end
+            local now = ngx.now()
+            if math.ceil((now - start) * 1000) < 10 then
+                ngx.say(now, " ", start)
+                return
+            end
+            start = now
+
+            red:init_pipeline()
+            red:get("b")
+            red:set("A", "a")
+
+            local results, err = red:commit_pipeline()
+            if not results then
+                ngx.say("failed to commit: ", err)
+                return
+            end
+            local now = ngx.now()
+            if math.ceil((now - start) * 1000) < 20 or math.ceil((now - start) 
* 1000) > 30 then
+                ngx.say(now, " ", start)
+                return
+            end
+
+            ngx.say("ok")
+        }
+    }
+--- response_body
+ok
+--- stream_conf_enable
+
+
+
+=== TEST 10: multi keys
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local etcd = require("apisix.core.etcd")
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                {
+                    protocol = {
+                        name = "redis",
+                        conf = {
+                            faults = {
+                                {delay = 0.03, key = "b", commands = {"del"}},
+                                {delay = 0.02, key = "a", commands = {"mset"}},
+                                {delay = 0.01, key = "b", commands = {"mset"}},
+                            }
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:6379"] = 1
+                        },
+                        type = "roundrobin"
+                    }
+                }
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 11: hit
+--- config
+    location /t {
+        content_by_lua_block {
+            local redis = require "resty.redis"
+            local red = redis:new()
+
+            local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+            if not ok then
+                ngx.say("failed to connect: ", err)
+                return
+            end
+
+            local start = ngx.now()
+            local res, err = red:mset("c", 1, "a", 2, "b", 3)
+            if not res then
+                ngx.say(err)
+                return
+            end
+            local now = ngx.now()
+            if math.ceil((now - start) * 1000) < 20 then
+                ngx.say(now, " ", start)
+                return
+            end
+            start = now
+
+            local res, err = red:mset("b", 2, "a", 3)
+            if not res then
+                ngx.say(err)
+                return
+            end
+            local now = ngx.now()
+            if math.ceil((now - start) * 1000) < 10 or math.ceil((now - start) 
* 1000) > 15 then
+                ngx.say(now, " ", start)
+                return
+            end
+            start = now
+
+            local res, err = red:mset("c", "a")
+            if not res then
+                ngx.say(err)
+                return
+            end
+            local now = ngx.now()
+            if math.ceil((now - start) * 1000) > 5 then
+                ngx.say(now, " ", start)
+                return
+            end
+            start = now
+
+            local res, err = red:del("a", "b")
+            if not res then
+                ngx.say(err)
+                return
+            end
+            local now = ngx.now()
+            if math.ceil((now - start) * 1000) < 30 then
+                ngx.say(now, " ", start)
+                return
+            end
+            start = now
+
+            ngx.say("ok")
+        }
+    }
+--- response_body
+ok
+--- stream_conf_enable

Reply via email to