This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 48032f144 chore: improve the implementation of pubsub module (#7043)
48032f144 is described below
commit 48032f144582e5ff4319a11a781b376c34ffaf16
Author: Zeping Bai <[email protected]>
AuthorDate: Fri May 13 20:40:12 2022 +0800
chore: improve the implementation of pubsub module (#7043)
---
.github/workflows/build.yml | 4 +-
.github/workflows/centos7-ci.yml | 4 +-
apisix/core/pubsub.lua | 95 +++++++++++++++++++++-------------------
t/pubsub/pubsub.t | 2 +-
4 files changed, 56 insertions(+), 49 deletions(-)
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 6714e4277..ea6eda1e3 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -31,8 +31,8 @@ jobs:
- linux_openresty_1_17
test_dir:
- t/plugin
- - t/admin t/cli t/config-center-yaml t/control t/core t/debug
t/discovery t/error_page t/misc t/pubsub
- - t/node t/router t/script t/stream-node t/utils t/wasm
t/xds-library t/xrpc
+ - t/admin t/cli t/config-center-yaml t/control t/core t/debug
t/discovery t/error_page t/misc
+ - t/node t/pubsub t/router t/script t/stream-node t/utils t/wasm
t/xds-library t/xrpc
runs-on: ${{ matrix.platform }}
timeout-minutes: 90
diff --git a/.github/workflows/centos7-ci.yml b/.github/workflows/centos7-ci.yml
index 03ae393b0..a63622e7c 100644
--- a/.github/workflows/centos7-ci.yml
+++ b/.github/workflows/centos7-ci.yml
@@ -29,8 +29,8 @@ jobs:
matrix:
test_dir:
- t/plugin
- - t/admin t/cli t/config-center-yaml t/control t/core t/debug
t/discovery t/error_page t/misc t/pubsub
- - t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library
+ - t/admin t/cli t/config-center-yaml t/control t/core t/debug
t/discovery t/error_page t/misc
+ - t/node t/pubsub t/router t/script t/stream-node t/utils t/wasm
t/xds-library
steps:
- name: Check out code
diff --git a/apisix/core/pubsub.lua b/apisix/core/pubsub.lua
index 14df27616..1f392818c 100644
--- a/apisix/core/pubsub.lua
+++ b/apisix/core/pubsub.lua
@@ -58,23 +58,42 @@ local function init_pb_state()
end
+-- parse command name and parameters from client message
+local function get_cmd(data)
+ for key, value in pairs(data) do
+ -- There are sequence and command properties in the data,
+ -- select the handler according to the command value.
+ if key ~= "sequence" then
+ return key, value
+ end
+ end
+end
+
+
+-- send generic response to client
+local function send_resp(ws, sequence, data)
+ data.sequence = sequence
+ local ok, encoded = pcall(pb.encode, "PubSubResp", data)
+ if not ok or not encoded then
+ log.error("failed to encode response message, err: ", encoded)
+ return
+ end
+
+ local _, err = ws:send_binary(encoded)
+ if err then
+ log.error("failed to send response to client, err: ", err)
+ end
+end
+
+
-- send error response to client
local function send_error(ws, sequence, err_msg)
- local ok, data = pcall(pb.encode, "PubSubResp", {
- sequence = sequence,
+ return send_resp(ws, sequence, {
error_resp = {
code = 0,
message = err_msg,
},
})
- if not ok or not data then
- log.error("failed to encode error response message, err: ", data)
- end
-
- local _, err = ws:send_binary(data)
- if err then
- log.error("failed to send response to client, err: ", err)
- end
end
@@ -119,8 +138,8 @@ end
-- no error exists.
--
-- @function core.pubsub.on
--- @tparam string command to add callback
--- @tparam function handler callback on receipt of command
+-- @tparam string command The command to add callback.
+-- @tparam func handler The callback function on receipt of command.
-- @usage
-- pubsub:on(command, function (params)
-- return data, err
@@ -180,40 +199,28 @@ function _M.wait(self)
-- command sequence code
local sequence = data.sequence
- -- call command handler to generate response data
- for key, value in pairs(data) do
- -- There are sequence and command properties in the data,
- -- select the handler according to the command value.
- if key ~= "sequence" then
- local handler = self.cmd_handler[key]
- if not handler then
- log.error("pubsub callback handler not registered for the",
- " command, command: ", key)
- send_error(ws, sequence, "unknown command: " .. key)
- break
- end
-
- local resp, err = handler(value)
- if not resp then
- send_error(ws, sequence, err)
- break
- end
-
- -- write back the sequence
- resp.sequence = sequence
- local ok, data = pcall(pb.encode, "PubSubResp", resp)
- if not ok or not data then
- log.error("failed to encode response message, err: ", data)
- break
- end
- local _, err = ws:send_binary(data)
- if err then
- log.error("failed to send response to client, err: ", err)
- end
- break
- end
+ local cmd, params = get_cmd(data)
+ if not cmd and not params then
log.warn("pubsub server receives empty command")
+ goto continue
+ end
+
+ -- find the handler for the current command
+ local handler = self.cmd_handler[cmd]
+ if not handler then
+ log.error("pubsub callback handler not registered for the",
+ " command, command: ", cmd)
+ send_error(ws, sequence, "unknown command")
+ goto continue
+ end
+
+ -- call command handler to generate response data
+ local resp, err = handler(params)
+ if not resp then
+ send_error(ws, sequence, err)
+ goto continue
end
+ send_resp(ws, sequence, resp)
::continue::
end
diff --git a/t/pubsub/pubsub.t b/t/pubsub/pubsub.t
index 84b80f532..5ce0b0b03 100644
--- a/t/pubsub/pubsub.t
+++ b/t/pubsub/pubsub.t
@@ -151,7 +151,7 @@ ret: error
}
}
--- response_body
-unknown command: cmd_empty
+unknown command
--- error_log
pubsub callback handler not registered for the command, command: cmd_empty