This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 48032f144 chore: improve the implementation of pubsub module (#7043)
48032f144 is described below

commit 48032f144582e5ff4319a11a781b376c34ffaf16
Author: Zeping Bai <[email protected]>
AuthorDate: Fri May 13 20:40:12 2022 +0800

    chore: improve the implementation of pubsub module (#7043)
---
 .github/workflows/build.yml      |  4 +-
 .github/workflows/centos7-ci.yml |  4 +-
 apisix/core/pubsub.lua           | 95 +++++++++++++++++++++-------------------
 t/pubsub/pubsub.t                |  2 +-
 4 files changed, 56 insertions(+), 49 deletions(-)

diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 6714e4277..ea6eda1e3 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -31,8 +31,8 @@ jobs:
           - linux_openresty_1_17
         test_dir:
           - t/plugin
-          - t/admin t/cli t/config-center-yaml t/control t/core t/debug 
t/discovery t/error_page t/misc t/pubsub
-          - t/node t/router t/script t/stream-node t/utils t/wasm 
t/xds-library t/xrpc
+          - t/admin t/cli t/config-center-yaml t/control t/core t/debug 
t/discovery t/error_page t/misc
+          - t/node t/pubsub t/router t/script t/stream-node t/utils t/wasm 
t/xds-library t/xrpc
 
     runs-on: ${{ matrix.platform }}
     timeout-minutes: 90
diff --git a/.github/workflows/centos7-ci.yml b/.github/workflows/centos7-ci.yml
index 03ae393b0..a63622e7c 100644
--- a/.github/workflows/centos7-ci.yml
+++ b/.github/workflows/centos7-ci.yml
@@ -29,8 +29,8 @@ jobs:
       matrix:
         test_dir:
           - t/plugin
-          - t/admin t/cli t/config-center-yaml t/control t/core t/debug 
t/discovery t/error_page t/misc t/pubsub
-          - t/node t/router t/script t/stream-node t/utils t/wasm t/xds-library
+          - t/admin t/cli t/config-center-yaml t/control t/core t/debug 
t/discovery t/error_page t/misc
+          - t/node t/pubsub t/router t/script t/stream-node t/utils t/wasm 
t/xds-library
 
     steps:
     - name: Check out code
diff --git a/apisix/core/pubsub.lua b/apisix/core/pubsub.lua
index 14df27616..1f392818c 100644
--- a/apisix/core/pubsub.lua
+++ b/apisix/core/pubsub.lua
@@ -58,23 +58,42 @@ local function init_pb_state()
 end
 
 
+-- parse command name and parameters from client message
+local function get_cmd(data)
+    for key, value in pairs(data) do
+        -- There are sequence and command properties in the data,
+        -- select the handler according to the command value.
+        if key ~= "sequence" then
+            return key, value
+        end
+    end
+end
+
+
+-- send generic response to client
+local function send_resp(ws, sequence, data)
+    data.sequence = sequence
+    local ok, encoded = pcall(pb.encode, "PubSubResp", data)
+    if not ok or not encoded then
+        log.error("failed to encode response message, err: ", encoded)
+        return
+    end
+
+    local _, err = ws:send_binary(encoded)
+    if err then
+        log.error("failed to send response to client, err: ", err)
+    end
+end
+
+
 -- send error response to client
 local function send_error(ws, sequence, err_msg)
-    local ok, data = pcall(pb.encode, "PubSubResp", {
-        sequence = sequence,
+    return send_resp(ws, sequence, {
         error_resp = {
             code = 0,
             message = err_msg,
         },
     })
-    if not ok or not data then
-        log.error("failed to encode error response message, err: ", data)
-    end
-
-    local _, err = ws:send_binary(data)
-    if err then
-        log.error("failed to send response to client, err: ", err)
-    end
 end
 
 
@@ -119,8 +138,8 @@ end
 -- no error exists.
 --
 -- @function core.pubsub.on
--- @tparam string command to add callback
--- @tparam function handler callback on receipt of command
+-- @tparam string command The command to add callback.
+-- @tparam func handler The callback function on receipt of command.
 -- @usage
 -- pubsub:on(command, function (params)
 --     return data, err
@@ -180,40 +199,28 @@ function _M.wait(self)
         -- command sequence code
         local sequence = data.sequence
 
-        -- call command handler to generate response data
-        for key, value in pairs(data) do
-            -- There are sequence and command properties in the data,
-            -- select the handler according to the command value.
-            if key ~= "sequence" then
-                local handler = self.cmd_handler[key]
-                if not handler then
-                    log.error("pubsub callback handler not registered for the",
-                        " command, command: ", key)
-                    send_error(ws, sequence, "unknown command: " .. key)
-                    break
-                end
-
-                local resp, err = handler(value)
-                if not resp then
-                    send_error(ws, sequence, err)
-                    break
-                end
-
-                -- write back the sequence
-                resp.sequence = sequence
-                local ok, data = pcall(pb.encode, "PubSubResp", resp)
-                if not ok or not data then
-                    log.error("failed to encode response message, err: ", data)
-                    break
-                end
-                local _, err = ws:send_binary(data)
-                if err then
-                    log.error("failed to send response to client, err: ", err)
-                end
-                break
-            end
+        local cmd, params = get_cmd(data)
+        if not cmd and not params then
             log.warn("pubsub server receives empty command")
+            goto continue
+        end
+
+        -- find the handler for the current command
+        local handler = self.cmd_handler[cmd]
+        if not handler then
+            log.error("pubsub callback handler not registered for the",
+                " command, command: ", cmd)
+            send_error(ws, sequence, "unknown command")
+            goto continue
+        end
+
+        -- call command handler to generate response data
+        local resp, err = handler(params)
+        if not resp then
+            send_error(ws, sequence, err)
+            goto continue
         end
+        send_resp(ws, sequence, resp)
 
         ::continue::
     end
diff --git a/t/pubsub/pubsub.t b/t/pubsub/pubsub.t
index 84b80f532..5ce0b0b03 100644
--- a/t/pubsub/pubsub.t
+++ b/t/pubsub/pubsub.t
@@ -151,7 +151,7 @@ ret: error
         }
     }
 --- response_body
-unknown command: cmd_empty
+unknown command
 --- error_log
 pubsub callback handler not registered for the command, command: cmd_empty
 

Reply via email to