This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 5da97515c feat(redis): delay according to the cmd & key (#6999)
5da97515c is described below
commit 5da97515cff8da8fa0f1c8ab9ee55662bd871f13
Author: 罗泽轩 <[email protected]>
AuthorDate: Tue May 10 09:44:21 2022 +0800
feat(redis): delay according to the cmd & key (#6999)
Signed-off-by: spacewander <[email protected]>
---
.ignore_words | 1 +
apisix/stream/xrpc/protocols/redis/commands.lua | 222 +++++++++++++++++
apisix/stream/xrpc/protocols/redis/init.lua | 113 ++++++++-
apisix/stream/xrpc/protocols/redis/schema.lua | 25 ++
apisix/stream/xrpc/runner.lua | 5 +-
apisix/stream/xrpc/sdk.lua | 4 +-
t/xrpc/redis.t | 308 ++++++++++++++++++++++++
7 files changed, 669 insertions(+), 9 deletions(-)
diff --git a/.ignore_words b/.ignore_words
index 33b02228e..5e9354537 100644
--- a/.ignore_words
+++ b/.ignore_words
@@ -6,3 +6,4 @@ shttp
nd
hel
nulll
+smove
diff --git a/apisix/stream/xrpc/protocols/redis/commands.lua
b/apisix/stream/xrpc/protocols/redis/commands.lua
new file mode 100644
index 000000000..ff3338fde
--- /dev/null
+++ b/apisix/stream/xrpc/protocols/redis/commands.lua
@@ -0,0 +1,222 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local ipairs = ipairs
+local pairs = pairs
+
+
+local cmd_to_key_finder = {}
+--[[
+-- the data is generated from the script below
+local redis = require "resty.redis"
+local red = redis:new()
+
+local ok, err = red:connect("127.0.0.1", 6379)
+if not ok then
+ ngx.say("failed to connect: ", err)
+ return
+end
+
+local res = red:command("info")
+local map = {}
+for _, r in ipairs(res) do
+ local first_key = r[4]
+ local last_key = r[5]
+ local step = r[6]
+ local idx = first_key .. ':' .. last_key .. ':' .. step
+
+ if idx ~= "1:1:1" then
+ -- "1:1:1" is the default
+ if map[idx] then
+ table.insert(map[idx], r[1])
+ else
+ map[idx] = {r[1]}
+ end
+ end
+end
+for _, r in pairs(map) do
+ table.sort(r)
+end
+local dump = require('pl.pretty').dump; dump(map)
+--]]
+local key_to_cmd = {
+ ["0:0:0"] = {
+ "acl",
+ "asking",
+ "auth",
+ "bgrewriteaof",
+ "bgsave",
+ "blmpop",
+ "bzmpop",
+ "client",
+ "cluster",
+ "command",
+ "config",
+ "dbsize",
+ "debug",
+ "discard",
+ "echo",
+ "eval",
+ "eval_ro",
+ "evalsha",
+ "evalsha_ro",
+ "exec",
+ "failover",
+ "fcall",
+ "fcall_ro",
+ "flushall",
+ "flushdb",
+ "function",
+ "hello",
+ "info",
+ "keys",
+ "lastsave",
+ "latency",
+ "lmpop",
+ "lolwut",
+ "memory",
+ "module",
+ "monitor",
+ "multi",
+ "object",
+ "pfselftest",
+ "ping",
+ "psubscribe",
+ "psync",
+ "publish",
+ "pubsub",
+ "punsubscribe",
+ "quit",
+ "randomkey",
+ "readonly",
+ "readwrite",
+ "replconf",
+ "replicaof",
+ "reset",
+ "role",
+ "save",
+ "scan",
+ "script",
+ "select",
+ "shutdown",
+ "sintercard",
+ "slaveof",
+ "slowlog",
+ "subscribe",
+ "swapdb",
+ "sync",
+ "time",
+ "unsubscribe",
+ "unwatch",
+ "wait",
+ "xgroup",
+ "xinfo",
+ "xread",
+ "xreadgroup",
+ "zdiff",
+ "zinter",
+ "zintercard",
+ "zmpop",
+ "zunion"
+ },
+ ["1:-1:1"] = {
+ "del",
+ "exists",
+ "mget",
+ "pfcount",
+ "pfmerge",
+ "sdiff",
+ "sdiffstore",
+ "sinter",
+ "sinterstore",
+ "ssubscribe",
+ "sunion",
+ "sunionstore",
+ "sunsubscribe",
+ "touch",
+ "unlink",
+ "watch"
+ },
+ ["1:-1:2"] = {
+ "mset",
+ "msetnx"
+ },
+ ["1:-2:1"] = {
+ "blpop",
+ "brpop",
+ "bzpopmax",
+ "bzpopmin"
+ },
+ ["1:2:1"] = {
+ "blmove",
+ "brpoplpush",
+ "copy",
+ "geosearchstore",
+ "lcs",
+ "lmove",
+ "rename",
+ "renamenx",
+ "rpoplpush",
+ "smove",
+ "zrangestore"
+ },
+ ["2:-1:1"] = {
+ "bitop"
+ },
+ ["2:2:1"] = {
+ "pfdebug"
+ },
+ ["3:3:1"] = {
+ "migrate"
+ }
+}
+local key_finders = {
+ ["0:0:0"] = false,
+ ["1:-1:1"] = function (idx, narg)
+ return 1 < idx
+ end,
+ ["1:-1:2"] = function (idx, narg)
+ return 1 < idx and idx % 2 == 0
+ end,
+ ["1:-2:1"] = function (idx, narg)
+ return 1 < idx and idx < narg - 1
+ end,
+ ["1:2:1"] = function (idx, narg)
+ return idx == 2 or idx == 3
+ end,
+ ["2:-1:1"] = function (idx, narg)
+ return 2 < idx
+ end,
+ ["2:2:1"] = function (idx, narg)
+ return idx == 3
+ end,
+ ["3:3:1"] = function (idx, narg)
+ return idx == 4
+ end
+}
+for k, cmds in pairs(key_to_cmd) do
+ for _, cmd in ipairs(cmds) do
+ cmd_to_key_finder[cmd] = key_finders[k]
+ end
+end
+
+
+return {
+ cmd_to_key_finder = cmd_to_key_finder,
+ default_key_finder = function (idx, narg)
+ return idx == 2
+ end,
+}
diff --git a/apisix/stream/xrpc/protocols/redis/init.lua
b/apisix/stream/xrpc/protocols/redis/init.lua
index ac31a4521..2635cca71 100644
--- a/apisix/stream/xrpc/protocols/redis/init.lua
+++ b/apisix/stream/xrpc/protocols/redis/init.lua
@@ -16,6 +16,7 @@
--
local core = require("apisix.core")
local sdk = require("apisix.stream.xrpc.sdk")
+local commands = require("apisix.stream.xrpc.protocols.redis.commands")
local xrpc_socket = require("resty.apisix.stream.xrpc.socket")
local ffi = require("ffi")
local ffi_str = ffi.string
@@ -23,8 +24,10 @@ local math_random = math.random
local OK = ngx.OK
local DECLINED = ngx.DECLINED
local DONE = ngx.DONE
+local sleep = ngx.sleep
local str_byte = string.byte
local str_fmt = string.format
+local ipairs = ipairs
local tonumber = tonumber
@@ -32,6 +35,7 @@ local tonumber = tonumber
-- There is no plan to support inline command format
local _M = {}
local MAX_LINE_LEN = 128
+local MAX_VALUE_LEN = 128
local PREFIX_ARR = str_byte("*")
local PREFIX_STR = str_byte("$")
local PREFIX_STA = str_byte("+")
@@ -39,7 +43,62 @@ local PREFIX_INT = str_byte(":")
local PREFIX_ERR = str_byte("-")
+local lrucache = core.lrucache.new({
+ type = "plugin",
+})
+
+
+local function create_matcher(conf)
+ local matcher = {}
+ --[[
+ {"delay": 5, "key":"x", "commands":["GET", "MGET"]}
+ {"delay": 5, "commands":["GET"]}
+ => {
+ get = {keys = {x = {delay = 5}, * = {delay = 5}}}
+ mget = {keys = {x = {delay = 5}}}
+ }
+ ]]--
+ for _, rule in ipairs(conf.faults) do
+ for _, cmd in ipairs(rule.commands) do
+ cmd = cmd:lower()
+ local key = rule.key
+ local kf = commands.cmd_to_key_finder[cmd]
+ local key_matcher = matcher[cmd]
+ if not key_matcher then
+ key_matcher = {
+ keys = {}
+ }
+ matcher[cmd] = key_matcher
+ end
+
+ if not key or kf == false then
+ key = "*"
+ end
+
+ if key_matcher.keys[key] then
+ core.log.warn("override existent fault rule of cmd: ", cmd, ",
key: ", key)
+ end
+
+ key_matcher.keys[key] = rule
+ end
+ end
+
+ return matcher
+end
+
+
+local function get_matcher(conf, ctx)
+ return core.lrucache.plugin_ctx(lrucache, ctx, nil, create_matcher, conf)
+end
+
+
function _M.init_downstream(session)
+ local conf = session.route.protocol.conf
+ if conf and conf.faults then
+ local matcher = get_matcher(conf, session.conn_ctx)
+ session.matcher = matcher
+ end
+
session.req_id_seq = 0
session.resp_id_seq = 0
return xrpc_socket.downstream.socket()
@@ -83,26 +142,53 @@ local function read_req(session, sk)
local cmd_line = core.tablepool.fetch("xrpc_redis_cmd_line", narg, 0)
- for i = 1, narg do
+ local n, err = read_len(sk)
+ if not n then
+ return nil, err
+ end
+
+ local p, err = sk:read(n + 2)
+ if not p then
+ return nil, err
+ end
+
+ local s = ffi_str(p, n)
+ cmd_line[1] = s
+
+ local key_finder
+ local matcher = session.matcher
+ if matcher then
+ matcher = matcher[s:lower()]
+ if matcher then
+ key_finder = commands.cmd_to_key_finder[s] or
commands.default_key_finder
+ end
+ end
+
+ for i = 2, narg do
+ local is_key = false
+ if key_finder then
+ is_key = key_finder(i, narg)
+ end
+
local n, err = read_len(sk)
if not n then
return nil, err
end
local s
- if n > 1024 then
+ if not is_key and n > MAX_VALUE_LEN then
-- avoid recording big value
- local p, err = sk:read(1024)
+ local p, err = sk:read(MAX_VALUE_LEN)
if not p then
return nil, err
end
- local ok, err = sk:drain(n - 1024 + 2)
+ local ok, err = sk:drain(n - MAX_VALUE_LEN + 2)
if not ok then
return nil, err
end
- s = ffi_str(p, 1024) .. "..."
+ s = ffi_str(p, MAX_VALUE_LEN) .. "...(" .. n .. " bytes)"
else
local p, err = sk:read(n + 2)
if not p then
@@ -110,6 +196,11 @@ local function read_req(session, sk)
end
s = ffi_str(p, n)
+
+ if is_key and matcher.keys[s] then
+ matcher = matcher.keys[s]
+ key_finder = nil
+ end
end
cmd_line[i] = s
@@ -121,6 +212,18 @@ local function read_req(session, sk)
ctx.cmd = ctx.cmd_line[1]
local pipelined = sk:has_pending_data()
+
+ if matcher then
+ if matcher.keys then
+ -- try to match any key of this command
+ matcher = matcher.keys["*"]
+ end
+
+ if matcher then
+ sleep(matcher.delay)
+ end
+ end
+
return true, nil, pipelined
end
diff --git a/apisix/stream/xrpc/protocols/redis/schema.lua
b/apisix/stream/xrpc/protocols/redis/schema.lua
index 49a7a0e48..0b6c90c65 100644
--- a/apisix/stream/xrpc/protocols/redis/schema.lua
+++ b/apisix/stream/xrpc/protocols/redis/schema.lua
@@ -20,6 +20,31 @@ local core = require("apisix.core")
local schema = {
type = "object",
properties = {
+ faults = {
+ type = "array",
+ minItems = 1,
+ items = {
+ type = "object",
+ properties = {
+ commands = {
+ type = "array",
+ minItems = 1,
+ items = {
+ type = "string"
+ },
+ },
+ key = {
+ type = "string",
+ minLength = 1,
+ },
+ delay = {
+ type = "number",
+ description = "additional delay in seconds",
+ }
+ },
+ required = {"commands", "delay"}
+ },
+ },
},
}
diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua
index 8de3176ff..ea807f60a 100644
--- a/apisix/stream/xrpc/runner.lua
+++ b/apisix/stream/xrpc/runner.lua
@@ -28,9 +28,10 @@ local _M = {}
local function open_session(conn_ctx)
conn_ctx.xrpc_session = {
- _upstream_conf = conn_ctx.matched_upstream,
+ conn_ctx = conn_ctx,
+ route = conn_ctx.matched_route.value,
-- fields start with '_' should not be accessed by the protocol
implementation
- _route = conn_ctx.matched_route.value,
+ _upstream_conf = conn_ctx.matched_upstream,
_ctxs = {},
}
return conn_ctx.xrpc_session
diff --git a/apisix/stream/xrpc/sdk.lua b/apisix/stream/xrpc/sdk.lua
index 9773cb36f..3e3f4557a 100644
--- a/apisix/stream/xrpc/sdk.lua
+++ b/apisix/stream/xrpc/sdk.lua
@@ -125,8 +125,8 @@ end
-- @treturn table the new router under the specific protocol
-- @treturn string the new router version
function _M.get_router(session, version)
- local protocol_name = session._route.protocol.name
- local id = session._route.id
+ local protocol_name = session.route.protocol.name
+ local id = session.route.id
local items, conf_version = router.routes()
if version == conf_version then
diff --git a/t/xrpc/redis.t b/t/xrpc/redis.t
index 52a5fa0f5..a3a9ec9ca 100644
--- a/t/xrpc/redis.t
+++ b/t/xrpc/redis.t
@@ -257,3 +257,311 @@ hget animals: bark
}
--- response_body
--- stream_conf_enable
+
+
+
+=== TEST 6: delay
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local etcd = require("apisix.core.etcd")
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "redis",
+ conf = {
+ faults = {
+ {delay = 0.01, key = "ignored", commands =
{"Ping", "time"}}
+ }
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:6379"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 7: hit
+--- config
+ location /t {
+ content_by_lua_block {
+ local redis = require "resty.redis"
+ local red = redis:new()
+
+ local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+ if not ok then
+ ngx.say("failed to connect: ", err)
+ return
+ end
+
+ local start = ngx.now()
+ local res, err = red:ping()
+ if not res then
+ ngx.say(err)
+ return
+ end
+ local now = ngx.now()
+ -- use integer to bypass float point number precision problem
+ if math.ceil((now - start) * 1000) < 10 then
+ ngx.say(now, " ", start)
+ return
+ end
+ start = now
+
+ local res, err = red:time()
+ if not res then
+ ngx.say(err)
+ return
+ end
+ local now = ngx.now()
+ if math.ceil((now - start) * 1000) < 10 then
+ ngx.say(now, " ", start)
+ return
+ end
+ start = now
+
+ red:init_pipeline()
+ red:time()
+ red:time()
+ red:get("A")
+
+ local results, err = red:commit_pipeline()
+ if not results then
+ ngx.say("failed to commit: ", err)
+ return
+ end
+ local now = ngx.now()
+ if math.ceil((now - start) * 1000) < 20 or math.ceil((now - start)
* 1000) > 30 then
+ ngx.say(now, " ", start)
+ return
+ end
+
+ ngx.say("ok")
+ }
+ }
+--- response_body
+ok
+--- stream_conf_enable
+
+
+
+=== TEST 8: DFS match
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local etcd = require("apisix.core.etcd")
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "redis",
+ conf = {
+ faults = {
+ {delay = 0.02, key = "a", commands = {"get"}},
+ {delay = 0.01, commands = {"get", "set"}},
+ }
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:6379"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 9: hit
+--- config
+ location /t {
+ content_by_lua_block {
+ local redis = require "resty.redis"
+ local red = redis:new()
+
+ local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+ if not ok then
+ ngx.say("failed to connect: ", err)
+ return
+ end
+
+ local start = ngx.now()
+ local res, err = red:get("a")
+ if not res then
+ ngx.say(err)
+ return
+ end
+ local now = ngx.now()
+ if math.ceil((now - start) * 1000) < 20 then
+ ngx.say(now, " ", start)
+ return
+ end
+ start = now
+
+ local res, err = red:set("a", "a")
+ if not res then
+ ngx.say(err)
+ return
+ end
+ local now = ngx.now()
+ if math.ceil((now - start) * 1000) < 10 then
+ ngx.say(now, " ", start)
+ return
+ end
+ start = now
+
+ red:init_pipeline()
+ red:get("b")
+ red:set("A", "a")
+
+ local results, err = red:commit_pipeline()
+ if not results then
+ ngx.say("failed to commit: ", err)
+ return
+ end
+ local now = ngx.now()
+ if math.ceil((now - start) * 1000) < 20 or math.ceil((now - start)
* 1000) > 30 then
+ ngx.say(now, " ", start)
+ return
+ end
+
+ ngx.say("ok")
+ }
+ }
+--- response_body
+ok
+--- stream_conf_enable
+
+
+
+=== TEST 10: multi keys
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local etcd = require("apisix.core.etcd")
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ {
+ protocol = {
+ name = "redis",
+ conf = {
+ faults = {
+ {delay = 0.03, key = "b", commands = {"del"}},
+ {delay = 0.02, key = "a", commands = {"mset"}},
+ {delay = 0.01, key = "b", commands = {"mset"}},
+ }
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:6379"] = 1
+ },
+ type = "roundrobin"
+ }
+ }
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 11: hit
+--- config
+ location /t {
+ content_by_lua_block {
+ local redis = require "resty.redis"
+ local red = redis:new()
+
+ local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT)
+ if not ok then
+ ngx.say("failed to connect: ", err)
+ return
+ end
+
+ local start = ngx.now()
+ local res, err = red:mset("c", 1, "a", 2, "b", 3)
+ if not res then
+ ngx.say(err)
+ return
+ end
+ local now = ngx.now()
+ if math.ceil((now - start) * 1000) < 20 then
+ ngx.say(now, " ", start)
+ return
+ end
+ start = now
+
+ local res, err = red:mset("b", 2, "a", 3)
+ if not res then
+ ngx.say(err)
+ return
+ end
+ local now = ngx.now()
+ if math.ceil((now - start) * 1000) < 10 or math.ceil((now - start)
* 1000) > 15 then
+ ngx.say(now, " ", start)
+ return
+ end
+ start = now
+
+ local res, err = red:mset("c", "a")
+ if not res then
+ ngx.say(err)
+ return
+ end
+ local now = ngx.now()
+ if math.ceil((now - start) * 1000) > 5 then
+ ngx.say(now, " ", start)
+ return
+ end
+ start = now
+
+ local res, err = red:del("a", "b")
+ if not res then
+ ngx.say(err)
+ return
+ end
+ local now = ngx.now()
+ if math.ceil((now - start) * 1000) < 30 then
+ ngx.say(now, " ", start)
+ return
+ end
+ start = now
+
+ ngx.say("ok")
+ }
+ }
+--- response_body
+ok
+--- stream_conf_enable