tzssangglass commented on code in PR #7028:
URL: https://github.com/apache/apisix/pull/7028#discussion_r870896543


##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,200 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, 
"pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: "..err

Review Comment:
   ```suggestion
               return "failed to load pubsub protocol: "..err
   ```
   ```suggestion
               return "failed to load pubsub protocol: " .. err
   ```



##########
docs/en/latest/pubsub.md:
##########
@@ -0,0 +1,244 @@
+---
+title: PubSub
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pubsub 
framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## What is Pub-Sub
+
+Publish-subscribe is a messaging paradigm in which message producers do not 
send messages directly to message consumers, but are relayed by a specific 
broker that caches messages sent by producers and then actively pushes them to 
subscribed consumers or pulls them by consumers. This pattern is often used in 
system architectures for system decoupling or to handle high traffic scenarios.

Review Comment:
   ```suggestion
   Publish-subscribe is a messaging paradigm:
   
   - Producers send messages to specific brokers rather than directly to 
consumers.
   - Brokers cache messages sent by producers and then actively push them to 
subscribed consumers or pull them. 
   
   The system architectures use this pattern to decouple or handle high traffic 
scenarios.
   ```



##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,200 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, 
"pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: "..err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested 
command.
+-- Its first return value is the data, which needs to contain the data needed 
for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return 
when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client 
commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()
+                fatal_err = err
+                break
+            end
+
+            -- skip this loop for non-fatal errors
+            log.error("failed to receive websocket frame: ", err)
+            goto continue
+        end
+
+        -- handle client close connection
+        if raw_type == "close" then
+            ws:send_close()
+            return
+        end
+
+        -- the pub-sub messages use binary, if the message is not
+        -- binary, skip this message
+        if raw_type ~= "binary" then
+            log.warn("pubsub server receive non-binary data, type: ",
+                raw_type, ",data: ", raw_data)
+            goto continue
+        end
+
+        -- recovery of stored pb_store
+        pb.state(pb_state)
+
+        local data, err = pb.decode("PubSubReq", raw_data)
+        if not data then
+            log.error("pubsub server receives undecodable data, err: ", err)
+            goto continue
+        end
+
+        -- command sequence code
+        local sequence = data.sequence
+
+        -- call command handler to generate response data
+        for key, value in pairs(data) do
+            -- There are sequence and command properties in the data,
+            -- select the handler according to the command value.
+            if key ~= "sequence" then
+                local handler = self.cmd_handler[key]
+                if not handler then
+                    log.error("pubsub callback handler not registered for the",
+                        " this command, command: ", key)

Review Comment:
   ```suggestion
                       log.error("pubsub callback handler not registered for 
the",
                           " command: ", key)
   ```
   
   is enough?



##########
docs/en/latest/pubsub.md:
##########
@@ -0,0 +1,244 @@
+---
+title: PubSub
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pubsub 
framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## What is Pub-Sub
+
+Publish-subscribe is a messaging paradigm in which message producers do not 
send messages directly to message consumers, but are relayed by a specific 
broker that caches messages sent by producers and then actively pushes them to 
subscribed consumers or pulls them by consumers. This pattern is often used in 
system architectures for system decoupling or to handle high traffic scenarios.
+
+In Apache APISIX, the most common scenario is for handling north-south traffic 
from the server to the client. If we can combine it with a publish-subscribe 
scenario, we can achieve more powerful features, such as real-time 
collaboration on online documents, online games, etc.
+
+## Architecture
+
+![pub-sub architecture](../../assets/images/pubsub-architecture.svg)
+
+Currently, Apache APISIX supports WebSocket communication with the client, 
which can be any application that supports WebSocket, with Protocol Buffer as 
the serialization mechanism, see the [protocol 
definition](../../../apisix/pubsub.proto).
+
+## Supported messaging systems
+
+- [Aapche Kafka](pubsub/kafka.md)
+
+## How to support other messaging systems
+
+An extensible pubsub module is implemented in Apache APISIX, which is 
responsible for starting the WebSocket server, coding and decoding 
communication protocols, handling client commands, and through which new 
messaging system support can be simply added.

Review Comment:
   ```suggestion
   Apache APISIX implemented an extensible pubsub module responsible for 
starting the WebSocket server, coding and decoding communication protocols, 
handling client commands, and adding support for the new messaging system.
   ```



##########
docs/en/latest/pubsub.md:
##########
@@ -0,0 +1,244 @@
+---
+title: PubSub
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pubsub 
framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## What is Pub-Sub
+
+Publish-subscribe is a messaging paradigm in which message producers do not 
send messages directly to message consumers, but are relayed by a specific 
broker that caches messages sent by producers and then actively pushes them to 
subscribed consumers or pulls them by consumers. This pattern is often used in 
system architectures for system decoupling or to handle high traffic scenarios.
+
+In Apache APISIX, the most common scenario is for handling north-south traffic 
from the server to the client. If we can combine it with a publish-subscribe 
scenario, we can achieve more powerful features, such as real-time 
collaboration on online documents, online games, etc.
+
+## Architecture
+
+![pub-sub architecture](../../assets/images/pubsub-architecture.svg)
+
+Currently, Apache APISIX supports WebSocket communication with the client, 
which can be any application that supports WebSocket, with Protocol Buffer as 
the serialization mechanism, see the [protocol 
definition](../../../apisix/pubsub.proto).
+
+## Supported messaging systems
+
+- [Aapche Kafka](pubsub/kafka.md)
+
+## How to support other messaging systems
+
+An extensible pubsub module is implemented in Apache APISIX, which is 
responsible for starting the WebSocket server, coding and decoding 
communication protocols, handling client commands, and through which new 
messaging system support can be simply added.
+
+### Basic Steps
+
+- Add new commands and response body definitions to `pubsub.proto`
+- Add a new option to the `scheme` configuration item in upstream
+- Add a new `scheme` judgment branch to `http_access_phase`
+- Implement the required message system instruction processing functions
+- Optional: Create plugins to support advanced configurations of this 
messaging system
+
+### the example of Apache Kafka
+
+#### Add new commands and response body definitions to `pubsub.proto`
+
+The core of the protocol definition in `pubsub.proto` is the two parts 
`PubSubReq` and `PubSubResp`.
+
+First, create the `CmdKafkaFetch` command and add the required parameters. 
Then, register this command in the list of commands for `req` in `PubSubReq`, 
which is named `cmd_kafka_fetch`.
+
+```protobuf
+message CmdKafkaFetch {
+    string topic = 1;
+    int32 partition = 2;
+    int64 offset = 3;
+}
+
+message PubSubReq {
+    int64 sequence = 1;
+    oneof req {
+        CmdKafkaFetch cmd_kafka_fetch = 31;
+        // more commands
+    };
+}
+```
+
+Then create the corresponding response body `KafkaFetchResp` and register it 
in the `resp` of `PubSubResp`, named `kafka_fetch_resp`.
+
+```protobuf
+message KafkaFetchResp {
+    repeated KafkaMessage messages = 1;
+}
+
+message PubSubResp {
+    int64 sequence = 1;
+    oneof resp {
+        ErrorResp error_resp = 31;
+        KafkaFetchResp kafka_fetch_resp = 32;
+        // more responses
+    };
+}
+```
+
+#### Add a new option to the `scheme` configuration item in upstream
+
+Add a new option `kafka` to the `scheme` field enumeration in the `upstream` 
of `apisix/schema_def.lua`.
+
+```lua
+scheme = {
+    enum = {"grpc", "grpcs", "http", "https", "tcp", "tls", "udp", "kafka"},
+    -- other
+}
+```
+
+#### Add a new `scheme` judgment branch to `http_access_phase`
+
+Add a `scheme` judgment branch to the `http_access_phase` function in 
`apisix/init.lua` to support the processing of `kafka` type upstreams. Because 
of Apache Kafka has its own clustering and partition scheme, we do not need to 
use the Apache APISIX built-in load balancing algorithm, so we intercept and 
take over the processing flow before selecting the upstream node, here using 
the `kafka_access_phase` function.

Review Comment:
   ```suggestion
   Add a `scheme` judgment branch to the `http_access_phase` function in 
`apisix/init.lua` to support the processing of `kafka` type upstreams. Because 
Apache Kafka has its own clustering and partition scheme, we do not need to use 
the Apache APISIX built-in load balancing algorithm, so we intercept and take 
over the processing flow before selecting the upstream node, here using the 
`kafka_access_phase` function.
   ```



##########
apisix/core/pubsub.lua:
##########
@@ -0,0 +1,200 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Extensible framework to support publish-and-subscribe scenarios
+--
+-- @module core.pubsub
+
+local log          = require("apisix.core.log")
+local ws_server    = require("resty.websocket.server")
+local protoc       = require("protoc")
+local pb           = require("pb")
+local setmetatable = setmetatable
+local pcall        = pcall
+local pairs        = pairs
+
+
+local _M = { version = 0.1 }
+local mt = { __index = _M }
+
+local pb_state
+local function init_pb_state()
+    -- clear current pb state
+    pb.state(nil)
+
+    -- set int64 rule for pubsub module
+    pb.option("int64_as_string")
+
+    -- initialize protoc compiler
+    protoc.reload()
+    local pubsub_protoc = protoc.new()
+
+    -- compile the protobuf file on initial load module
+    -- ensure that each worker is loaded once
+    if not pubsub_protoc.loaded["pubsub.proto"] then
+        pubsub_protoc:addpath("apisix/include/apisix/model")
+        local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, 
"pubsub.proto")
+        if not ok then
+            pubsub_protoc:reset()
+            return "failed to load pubsub protocol: "..err
+        end
+    end
+
+    pb_state = pb.state(nil)
+end
+
+---
+-- Create pubsub module instance
+--
+-- @function core.pubsub.new
+-- @treturn pubsub module instance
+-- @treturn string|nil error message if present
+-- @usage
+-- local pubsub, err = core.pubsub.new()
+function _M.new()
+    if not pb_state then
+        local err = init_pb_state()
+        if err then
+            return nil, err
+        end
+    end
+
+    local ws, err = ws_server:new()
+    if not ws then
+        return nil, err
+    end
+
+    local obj = setmetatable({
+        ws_server = ws,
+        cmd_handler = {},
+    }, mt)
+
+    return obj
+end
+
+
+---
+-- Add command callbacks to pubsub module instances
+--
+-- The callback function prototype: function (params)
+-- The params in the parameters contain the data defined in the requested 
command.
+-- Its first return value is the data, which needs to contain the data needed 
for
+-- the particular resp, returns nil if an error exists.
+-- Its second return value is a string type error message, no need to return 
when
+-- no error exists.
+--
+-- @function core.pubsub.on
+-- @tparam string command to add callback
+-- @tparam function  func (params) callback on receipt of command
+-- @usage
+-- pubsub:on(command, function (params)
+--     return data, err
+-- end)
+function _M.on(self, command, handler)
+    self.cmd_handler[command] = handler
+end
+
+
+---
+-- Put the pubsub instance into an event loop, waiting to process client 
commands
+--
+-- @function core.pubsub.wait
+-- @usage
+-- local err = pubsub:wait()
+function _M.wait(self)
+    local fatal_err
+    local ws = self.ws_server
+    while true do
+        -- read raw data frames from websocket connection
+        local raw_data, raw_type, err = ws:recv_frame()
+        if err then
+            -- terminate the event loop when a fatal error occurs
+            if ws.fatal then
+                ws:send_close()
+                fatal_err = err
+                break
+            end
+
+            -- skip this loop for non-fatal errors
+            log.error("failed to receive websocket frame: ", err)
+            goto continue
+        end
+
+        -- handle client close connection
+        if raw_type == "close" then
+            ws:send_close()
+            return
+        end
+
+        -- the pub-sub messages use binary, if the message is not
+        -- binary, skip this message
+        if raw_type ~= "binary" then
+            log.warn("pubsub server receive non-binary data, type: ",
+                raw_type, ",data: ", raw_data)
+            goto continue
+        end
+
+        -- recovery of stored pb_store
+        pb.state(pb_state)
+
+        local data, err = pb.decode("PubSubReq", raw_data)
+        if not data then
+            log.error("pubsub server receives undecodable data, err: ", err)
+            goto continue
+        end
+
+        -- command sequence code
+        local sequence = data.sequence
+
+        -- call command handler to generate response data
+        for key, value in pairs(data) do
+            -- There are sequence and command properties in the data,
+            -- select the handler according to the command value.
+            if key ~= "sequence" then
+                local handler = self.cmd_handler[key]
+                if not handler then
+                    log.error("pubsub callback handler not registered for the",
+                        " this command, command: ", key)
+                    goto continue
+                end
+
+                local resp, err = handler(value)
+                if not resp then
+                    ws:send_binary(pb.encode("PubSubResp", {
+                        sequence = sequence,
+                        error_resp = {
+                            code = 0,
+                            message = err,
+                        },
+                    }))
+                    goto continue
+                end
+
+                -- write back the sequence
+                resp.sequence = sequence
+                ws:send_binary(pb.encode("PubSubResp", resp))
+            end
+        end
+
+        ::continue::
+    end
+
+    log.error("failed to handle pub-sub command, err: websocket server: ", 
fatal_err)

Review Comment:
   This log looks a bit confusing.



##########
docs/en/latest/pubsub.md:
##########
@@ -0,0 +1,244 @@
+---
+title: PubSub
+keywords:
+  - APISIX
+  - Pub-Sub
+description: This document contains information about the Apache APISIX pubsub 
framework.
+---
+
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+## What is Pub-Sub
+
+Publish-subscribe is a messaging paradigm in which message producers do not 
send messages directly to message consumers, but are relayed by a specific 
broker that caches messages sent by producers and then actively pushes them to 
subscribed consumers or pulls them by consumers. This pattern is often used in 
system architectures for system decoupling or to handle high traffic scenarios.
+
+In Apache APISIX, the most common scenario is for handling north-south traffic 
from the server to the client. If we can combine it with a publish-subscribe 
scenario, we can achieve more powerful features, such as real-time 
collaboration on online documents, online games, etc.

Review Comment:
   ```suggestion
   In Apache APISIX, the most common scenario is handling north-south traffic 
from the server to the client. Combining it with a publish-subscribe system, we 
can achieve more robust features, such as real-time collaboration on online 
documents, online games, etc.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to