This is an automated email from the ASF dual-hosted git repository.

impactcn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu-nginx.git


The following commit(s) were added to refs/heads/main by this push:
     new e7de1ec  [type:fix] update for zookeeper  add watch. (#18)
e7de1ec is described below

commit e7de1ec1f4989536f80dc771c065f61f89edf2a1
Author: Sixh-PrFor <[email protected]>
AuthorDate: Wed Jul 6 22:13:44 2022 +0800

    [type:fix] update for zookeeper  add watch. (#18)
    
    * 1. commit zookeeper connection
    
    * Increase the processing of heartbeats
    
    * 1. Send a request to subscribe
    
    * 1. zookeeper watch event...
    
    * 1. zookeeper watch event...
    
    * 1. update zookeeper proto.
    
    * 1. update zookeeper proto.
    
    * update zookeeper proto.
    
    * update zookeeper proto.
    
    * Modify Chinese
    
    * Modify Chinese
    
    * Modify Chinese
    
    * update sessionId lower.
    
    * fix add watch.
    
    * fix add watch.
    
    * fix add watch.
    
    * modify Support for zookeeper README.md
---
 README.md                                    | 34 ++++++++++++-
 lib/shenyu/register/core/string.lua          |  1 -
 lib/shenyu/register/core/utils.lua           |  2 +-
 lib/shenyu/register/zookeeper.lua            |  6 ++-
 lib/shenyu/register/zookeeper/connection.lua | 13 +++--
 lib/shenyu/register/zookeeper/zk_client.lua  | 75 +++++++++++++++++-----------
 lib/shenyu/register/zookeeper/zk_cluster.lua | 13 ++---
 lib/shenyu/register/zookeeper/zk_const.lua   | 23 ++++-----
 8 files changed, 107 insertions(+), 60 deletions(-)

diff --git a/README.md b/README.md
index 6e98937..ddccd31 100644
--- a/README.md
+++ b/README.md
@@ -4,7 +4,7 @@ Apache ShenYu Nginx Module(Experimental)
 This module provided SDK to watch available ShenYu instance list as upstream 
nodes by Service Register Center for OpenResty.
 1. [ETCD](#greeting-etcd) (Supported)
 2. [Nacos](#greeting-nacos) (Supported)
-3. Zookeeper (TODO)
+3. [Zookeeper](#greeting-zookeeper) (Supported)
 4. Consul (TODO)
 
 In the cluster mode, Apache ShenYu supports the deployment of multiple ShenYu 
instances, which may have new instances joining or leaving at any time.
@@ -109,6 +109,38 @@ openresty -s reload
 
 Here is a completed 
[example](https://github.com/apache/incubator-shenyu-nginx/blob/main/example/nacos/nginx.conf)
 working with Nacos.
 
+## Greeting Zookeeper 
+Modify the Nginx configure, create and initialize the ShenYu register to 
connect to target register center.
+Listen for changes to the node via the zookeeper watch event. Here is an 
example of the zookeeper configuration.
+```shell
+init_worker_by_lua_block {
+        local register = require("shenyu.register.zookeeper")
+        register.init({
+           servers = {"127.0.0.1:2181","127.0.0.1:2182"},
+           shenyu_storage = ngx.shared.shenyu_storage,
+           balancer_type = "roundrobin"
+        });
+    }
+```
+1. `servers` zookeeper cluster address.
+2. ``balancer_type`` specify the balancer. It has supported `chash` and `round 
robin`.
+
+Modify the upstream to enable to update upstream servers dynamically. This 
case will synchronize the ShenYu instance list with register center. And then 
pick one up for handling the request.
+```shell
+ upstream shenyu {
+        server 0.0.0.1;
+        balancer_by_lua_block {
+            require("shenyu.register.zookeeper").pick_and_set_peer()
+        }
+    }
+```
+Finally, restart OpenResty.
+```shell
+openresty -s reload
+```
+Here is a completed 
[example](https://github.com/apache/incubator-shenyu-nginx/blob/main/example/zookeeper/nginx.conf)
 working with Zookeeper.
+
+
 ## Contributor and Support
 
 * [How to Contributor](https://shenyu.apache.org/community/contributor-guide)
diff --git a/lib/shenyu/register/core/string.lua 
b/lib/shenyu/register/core/string.lua
index 089b4b5..5d63c29 100644
--- a/lib/shenyu/register/core/string.lua
+++ b/lib/shenyu/register/core/string.lua
@@ -17,7 +17,6 @@
 local table_insert          = table.insert
 local _M                    ={}
 
-
 function _M.split(str, delimiter)
     if not str or str == "" then return {} end
     if not delimiter or delimiter == "" then return { str } end
diff --git a/lib/shenyu/register/core/utils.lua 
b/lib/shenyu/register/core/utils.lua
index 5f74206..65774d9 100644
--- a/lib/shenyu/register/core/utils.lua
+++ b/lib/shenyu/register/core/utils.lua
@@ -23,7 +23,7 @@ function _M.paras_host(host, delimiter)
 end
 
 function _M.long_to_hex_string(long)
-    return string.format("0X%06X", long)
+    return string.format("0x%06x", long)
 end
 
 -- table len
diff --git a/lib/shenyu/register/zookeeper.lua 
b/lib/shenyu/register/zookeeper.lua
index b7a3518..2ec94e9 100644
--- a/lib/shenyu/register/zookeeper.lua
+++ b/lib/shenyu/register/zookeeper.lua
@@ -15,9 +15,9 @@
 -- limitations under the License.
 --
 local zk_cluster = require("shenyu.register.zookeeper.zk_cluster")
-local ngx_balancer = require("ngx.balancer")
-local balancer = require("shenyu.register.balancer")
 local const = require("shenyu.register.zookeeper.zk_const")
+local balancer = require("shenyu.register.balancer")
+local ngx_balancer = require("ngx.balancer")
 local ngx_timer_at = ngx.timer.at
 local xpcall = xpcall
 local ngx_log = ngx.log
@@ -29,6 +29,7 @@ local _M = {
 }
 
 local function watch_data(data)
+    ---@type table
     local server_lists = {}
     -- body
     for index, value in ipairs(data) do
@@ -36,6 +37,7 @@ local function watch_data(data)
             server_lists[value] = 1
         end
     end
+    ---@type table
     local s_nodes = _M.nodes
     for host, index in pairs(server_lists) do
         if not s_nodes[host] then
diff --git a/lib/shenyu/register/zookeeper/connection.lua 
b/lib/shenyu/register/zookeeper/connection.lua
index 4076f91..3298724 100644
--- a/lib/shenyu/register/zookeeper/connection.lua
+++ b/lib/shenyu/register/zookeeper/connection.lua
@@ -23,14 +23,14 @@ local tbunpack = struct.tbunpack
 local ngx_log = ngx.log
 local _timeout = 60 * 1000
 local _M = {}
-local mt = {__index = _M}
+local mt = { __index = _M }
 
 function _M.new(self)
     local sock, err = tcp()
     if not tcp then
         return nil, err
     end
-    return setmetatable({sock = sock, timeout = _timeout}, mt)
+    return setmetatable({ sock = sock, timeout = _timeout }, mt)
 end
 
 function _M.connect(self, ip, port)
@@ -74,7 +74,7 @@ function _M.read_len(self)
     return len
 end
 
-function _M.read_headler(self)
+function _M.read_header(self)
     local len = self:read_len()
     local b, err = self:read(len)
     if not b then
@@ -110,17 +110,16 @@ function _M.set_keepalive(self, ...)
     if not sock then
         return nil, "not initialized"
     end
-
+    
     return sock:setkeepalive(...)
 end
 
 function _M.set_timeouts(self, connect_timeout, send_timeout, read_timeout)
     local sock = self.sock
     if not sock then
-        error("not initialized", 2)
-        return
+        return nil, "not initialized"
     end
-
+    
     sock:settimeouts(connect_timeout, send_timeout, read_timeout)
 end
 
diff --git a/lib/shenyu/register/zookeeper/zk_client.lua 
b/lib/shenyu/register/zookeeper/zk_client.lua
index ceb490b..0da8366 100644
--- a/lib/shenyu/register/zookeeper/zk_client.lua
+++ b/lib/shenyu/register/zookeeper/zk_client.lua
@@ -24,22 +24,25 @@ local exiting = ngx.worker.exiting
 local sleep = ngx.sleep
 local strlen = string.len
 local _timeout = 60 * 1000
-local _M = {}
-local mt = {__index = _M}
+local _M = {
+}
+local mt = { __index = _M }
 
 function _M.new(self)
+    --- @type table
     local conn_, err = connection:new()
     if not conn_ then
         return nil, "initialized connection error" .. err
     end
     conn_:set_timeout(_timeout)
     conn_:set_keepalive()
-    return setmetatable({conn = conn_}, mt)
+    return setmetatable({ conn = conn_, children_listener = {} }, mt)
 end
 
 function _M.connect(self, host)
     -- body
     local conn = self.conn
+    ---@type table
     local iptables = util.paras_host(host, ":")
     local ip = iptables[1]
     local port = iptables[2]
@@ -47,10 +50,10 @@ function _M.connect(self, host)
     if not byt then
         return nil, "connection error" .. host
     end
-    local bytes, err = proto:serialize(proto.request_header, 
proto.connect_request)
+    local bytes = proto:serialize(proto.request_header, proto.connect_request)
     local b, err = conn:write(bytes)
     if not b then
-        return nil, "connect error " .. ip + ":" .. port
+        return nil, "connection error " .. ip + ":" .. port
     end
     local len = conn:read_len()
     if not len then
@@ -62,7 +65,7 @@ function _M.connect(self, host)
     end
     local rsp = proto.connect_response:unpack(bytes, 1)
     if not rsp then
-        return nil, "error"
+        return nil, "read connection response error"
     end
     self.xid = 0
     local t = rsp.timeout
@@ -70,10 +73,7 @@ function _M.connect(self, host)
     self.ping_time = (t / 3) / 1000
     self.host = host
     self.session_id = rsp.session_id
-    local tostring =
-        "proto_ver:" ..
-        rsp.proto_ver ..
-            "," .. "timeout:" .. rsp.timeout .. "," .. "session_id:" .. 
util.long_to_hex_string(rsp.session_id)
+    local tostring = "proto_ver:" .. rsp.proto_ver .. "," .. "timeout:" .. 
rsp.timeout .. "," .. "session_id:" .. util.long_to_hex_string(rsp.session_id)
     ngx_log(ngx.INFO, tostring)
     return true, nil
 end
@@ -100,8 +100,8 @@ function _M._get_children(self, path, is_watch)
         return bytes, "write bytes error"
     end
     --  If other data is received, it means that the data of the _get_children 
command has not been received
-    ::continue::
-    local rsp_header, bytes, end_index = conn:read_headler()
+    :: continue ::
+    local rsp_header, bytes, end_index = conn:read_header()
     if not rsp_header then
         return nil, "read headler error"
     end
@@ -119,22 +119,45 @@ function _M._get_children(self, path, is_watch)
         }
     end
     if rsp_header.xid == const.XID_PING then
-        goto continue
+        goto
+        continue
     end
     return nil, "get_children error"
 end
 
-function _M.add_watch(self, path)
+function _M.add_watch(self, path, listener)
     -- body
     local d, e = self:_get_children(path, 1)
     if not d then
         return d, e
     end
     self.watch = true
+    if not self.children_listener[path] then
+        self.children_listener[path] = listener
+    end
     return d, nil
 end
 
-local function reply_read(self, callback)
+local function watch_event(self, event)
+    if not event then
+        return
+    end
+    local type = event.type
+    local path = event.paths[1]
+    if type == const.WATCH_NODE_CHILDREN_CHANGE
+            or type == const.WATCH_NODE_CREATED
+            or type == const.WATCH_NODE_DELETED then
+        local listener = self.children_listener[path]
+        if listener then
+            local d, e = self:add_watch(path, listener)
+            if d then
+                listener(d.path)
+            end
+        end
+    end
+end
+
+local function reply_read(self)
     local conn = self.conn
     local h = proto.request_header
     h.xid = const.XID_PING
@@ -142,29 +165,23 @@ local function reply_read(self, callback)
     local req = proto:serialize(h, proto.ping_request)
     local ok, err = conn:write(req)
     if ok then
-        local h, bytes, end_start = conn:read_headler()
+        local h, bytes, end_start = conn:read_header()
         if h.xid == const.XID_PING then
             ngx_log(
-                ngx.DEBUG,
-                "Got ping zookeeper response host:" ..
-                    self.host .. " for sessionId:" .. 
util.long_to_hex_string(self.session_id)
+                    ngx.DEBUG,
+                    "Got ping zookeeper response host:" ..
+                            self.host .. " for sessionId:" .. 
util.long_to_hex_string(self.session_id)
             )
         elseif h.xid == const.XID_WATCH_EVENT then
             --decoding
-            local watch_event = proto.watch_event:unpack(bytes, end_start)
-            -- local xid, done, err, type, state = unpack(">iliii", bytes)
-            -- local eventPath = unpack_strings(strsub(bytes, 25))
-            local t = watch_event.paths[1]
-            local d, e = self:add_watch("" .. t)
-            if d then
-                callback(d.path)
-            end
+            local data = proto.watch_event:unpack(bytes, end_start)
+            watch_event(self, data)
         end
     end
     return ok, err
 end
 
-function _M.watch_receive(self, callback)
+function _M.watch_receive(self)
     local last_time = 0
     while true do
         if exiting() then
@@ -173,7 +190,7 @@ function _M.watch_receive(self, callback)
         end
         local can_ping = now() - last_time > self.ping_time
         if can_ping then
-            local ok, err = reply_read(self, callback)
+            local ok, err = reply_read(self)
             if err then
                 return nil, err
             end
diff --git a/lib/shenyu/register/zookeeper/zk_cluster.lua 
b/lib/shenyu/register/zookeeper/zk_cluster.lua
index d13f4f3..30afc7b 100644
--- a/lib/shenyu/register/zookeeper/zk_cluster.lua
+++ b/lib/shenyu/register/zookeeper/zk_cluster.lua
@@ -30,6 +30,7 @@ function _M.connect(self)
         return nil, "servers is null"
     end
     -- initialize
+    ---@type
     local client, err = zkclient:new()
     if not client then
         ngx_log(ngx.ERR, "Failed to initialize zk Client" .. err)
@@ -62,23 +63,23 @@ function _M.get_children(self, path)
     return data, nil
 end
 
-local function _watch_receive(self, callback)
+local function _watch_receive(self)
     local client = self.client
     if not client then
         ngx_log(ngx.ERR, "conn not initialized")
     end
-    return client:watch_receive(callback)
+    return client:watch_receive()
 end
 
-function _M.add_watch(self, path, callback)
+function _M.add_watch(self, path, listener)
     local client = self.client
     if not client then
         ngx_log(ngx.ERR, "conn not initialized")
     end
-    local data, err = client:add_watch(path)
+    local data, err = client:add_watch(path,listener)
     if data then
-        callback(data.path)
-        return _watch_receive(self, callback)
+        listener(data.path)
+        return _watch_receive(self)
     end
     return data, err
 end
diff --git a/lib/shenyu/register/zookeeper/zk_const.lua 
b/lib/shenyu/register/zookeeper/zk_const.lua
index 2aa7e3a..d3ec42e 100644
--- a/lib/shenyu/register/zookeeper/zk_const.lua
+++ b/lib/shenyu/register/zookeeper/zk_const.lua
@@ -19,33 +19,30 @@ local _M = {}
 
 -- XID
 _M.XID_WATCH_EVENT = -1
-
 _M.XID_PING = -2
-
 _M.XID_SET_WATCHES = -8
-
 --op code
-
 _M.ZOO_GET_CHILDREN = 8
-
 _M.ZOO_PING_OP = 11
-
 _M.ZOO_GET_CHILDREN2 = 12
-
 _M.ZOO_SET_WATCHES = 101
-
 _M.ZOO_ADD_WATCH = 106
-
+--watch type
+_M.WATCH_NONE = -1
+_M.WATCH_NODE_CREATED = 1
+_M.WATCH_NODE_DELETED = 2
+_M.WATCH_NODE_DATA_CHANGE = 3
+_M.WATCH_NODE_CHILDREN_CHANGE = 4
+_M.WATCH_DATA_WATCH_REMOVE = 5
+_M.WATCH_CHILD_WATCH_REMOVE = 6
+--cus const.
 _M.ZK_WATCH_PATH = "/shenyu/register/instance";
-
 --Definition of error codes.
 local error_code = {
     "0", "-1", "-2", "-3", "-4", "-5", "-6", "-7", "-8", "-100",
     "-101", "-102", "-103", "-108", "-110", "-111", "-112",
     "-113", "-114", "-115", "-118", "-119" }
-
-
---errorcode
+--err message.
 local error_msg = {
     err0 = "Ok",
     err1 = "System error",

Reply via email to