This is an automated email from the ASF dual-hosted git repository.
impactcn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu-nginx.git
The following commit(s) were added to refs/heads/main by this push:
new e7de1ec [type:fix] update for zookeeper add watch. (#18)
e7de1ec is described below
commit e7de1ec1f4989536f80dc771c065f61f89edf2a1
Author: Sixh-PrFor <[email protected]>
AuthorDate: Wed Jul 6 22:13:44 2022 +0800
[type:fix] update for zookeeper add watch. (#18)
* 1. commit zookeeper connection
* Increase the processing of heartbeats
* 1. Send a request to subscribe
* 1. zookeeper watch event...
* 1. zookeeper watch event...
* 1. update zookeeper proto.
* 1. update zookeeper proto.
* update zookeeper proto.
* update zookeeper proto.
* Modify Chinese
* Modify Chinese
* Modify Chinese
* update sessionId lower.
* fix add watch.
* fix add watch.
* fix add watch.
* modify Support for zookeeper README.md
---
README.md | 34 ++++++++++++-
lib/shenyu/register/core/string.lua | 1 -
lib/shenyu/register/core/utils.lua | 2 +-
lib/shenyu/register/zookeeper.lua | 6 ++-
lib/shenyu/register/zookeeper/connection.lua | 13 +++--
lib/shenyu/register/zookeeper/zk_client.lua | 75 +++++++++++++++++-----------
lib/shenyu/register/zookeeper/zk_cluster.lua | 13 ++---
lib/shenyu/register/zookeeper/zk_const.lua | 23 ++++-----
8 files changed, 107 insertions(+), 60 deletions(-)
diff --git a/README.md b/README.md
index 6e98937..ddccd31 100644
--- a/README.md
+++ b/README.md
@@ -4,7 +4,7 @@ Apache ShenYu Nginx Module(Experimental)
This module provided SDK to watch available ShenYu instance list as upstream
nodes by Service Register Center for OpenResty.
1. [ETCD](#greeting-etcd) (Supported)
2. [Nacos](#greeting-nacos) (Supported)
-3. Zookeeper (TODO)
+3. [Zookeeper](#greeting-zookeeper) (Supported)
4. Consul (TODO)
In the cluster mode, Apache ShenYu supports the deployment of multiple ShenYu
instances, which may have new instances joining or leaving at any time.
@@ -109,6 +109,38 @@ openresty -s reload
Here is a completed
[example](https://github.com/apache/incubator-shenyu-nginx/blob/main/example/nacos/nginx.conf)
working with Nacos.
+## Greeting Zookeeper
+Modify the Nginx configure, create and initialize the ShenYu register to
connect to target register center.
+Listen for changes to the node via the zookeeper watch event. Here is an
example of the zookeeper configuration.
+```shell
+init_worker_by_lua_block {
+ local register = require("shenyu.register.zookeeper")
+ register.init({
+ servers = {"127.0.0.1:2181","127.0.0.1:2182"},
+ shenyu_storage = ngx.shared.shenyu_storage,
+ balancer_type = "roundrobin"
+ });
+ }
+```
+1. `servers` zookeeper cluster address.
+2. ``balancer_type`` specify the balancer. It has supported `chash` and `round
robin`.
+
+Modify the upstream to enable to update upstream servers dynamically. This
case will synchronize the ShenYu instance list with register center. And then
pick one up for handling the request.
+```shell
+ upstream shenyu {
+ server 0.0.0.1;
+ balancer_by_lua_block {
+ require("shenyu.register.zookeeper").pick_and_set_peer()
+ }
+ }
+```
+Finally, restart OpenResty.
+```shell
+openresty -s reload
+```
+Here is a completed
[example](https://github.com/apache/incubator-shenyu-nginx/blob/main/example/zookeeper/nginx.conf)
working with Zookeeper.
+
+
## Contributor and Support
* [How to Contributor](https://shenyu.apache.org/community/contributor-guide)
diff --git a/lib/shenyu/register/core/string.lua
b/lib/shenyu/register/core/string.lua
index 089b4b5..5d63c29 100644
--- a/lib/shenyu/register/core/string.lua
+++ b/lib/shenyu/register/core/string.lua
@@ -17,7 +17,6 @@
local table_insert = table.insert
local _M ={}
-
function _M.split(str, delimiter)
if not str or str == "" then return {} end
if not delimiter or delimiter == "" then return { str } end
diff --git a/lib/shenyu/register/core/utils.lua
b/lib/shenyu/register/core/utils.lua
index 5f74206..65774d9 100644
--- a/lib/shenyu/register/core/utils.lua
+++ b/lib/shenyu/register/core/utils.lua
@@ -23,7 +23,7 @@ function _M.paras_host(host, delimiter)
end
function _M.long_to_hex_string(long)
- return string.format("0X%06X", long)
+ return string.format("0x%06x", long)
end
-- table len
diff --git a/lib/shenyu/register/zookeeper.lua
b/lib/shenyu/register/zookeeper.lua
index b7a3518..2ec94e9 100644
--- a/lib/shenyu/register/zookeeper.lua
+++ b/lib/shenyu/register/zookeeper.lua
@@ -15,9 +15,9 @@
-- limitations under the License.
--
local zk_cluster = require("shenyu.register.zookeeper.zk_cluster")
-local ngx_balancer = require("ngx.balancer")
-local balancer = require("shenyu.register.balancer")
local const = require("shenyu.register.zookeeper.zk_const")
+local balancer = require("shenyu.register.balancer")
+local ngx_balancer = require("ngx.balancer")
local ngx_timer_at = ngx.timer.at
local xpcall = xpcall
local ngx_log = ngx.log
@@ -29,6 +29,7 @@ local _M = {
}
local function watch_data(data)
+ ---@type table
local server_lists = {}
-- body
for index, value in ipairs(data) do
@@ -36,6 +37,7 @@ local function watch_data(data)
server_lists[value] = 1
end
end
+ ---@type table
local s_nodes = _M.nodes
for host, index in pairs(server_lists) do
if not s_nodes[host] then
diff --git a/lib/shenyu/register/zookeeper/connection.lua
b/lib/shenyu/register/zookeeper/connection.lua
index 4076f91..3298724 100644
--- a/lib/shenyu/register/zookeeper/connection.lua
+++ b/lib/shenyu/register/zookeeper/connection.lua
@@ -23,14 +23,14 @@ local tbunpack = struct.tbunpack
local ngx_log = ngx.log
local _timeout = 60 * 1000
local _M = {}
-local mt = {__index = _M}
+local mt = { __index = _M }
function _M.new(self)
local sock, err = tcp()
if not tcp then
return nil, err
end
- return setmetatable({sock = sock, timeout = _timeout}, mt)
+ return setmetatable({ sock = sock, timeout = _timeout }, mt)
end
function _M.connect(self, ip, port)
@@ -74,7 +74,7 @@ function _M.read_len(self)
return len
end
-function _M.read_headler(self)
+function _M.read_header(self)
local len = self:read_len()
local b, err = self:read(len)
if not b then
@@ -110,17 +110,16 @@ function _M.set_keepalive(self, ...)
if not sock then
return nil, "not initialized"
end
-
+
return sock:setkeepalive(...)
end
function _M.set_timeouts(self, connect_timeout, send_timeout, read_timeout)
local sock = self.sock
if not sock then
- error("not initialized", 2)
- return
+ return nil, "not initialized"
end
-
+
sock:settimeouts(connect_timeout, send_timeout, read_timeout)
end
diff --git a/lib/shenyu/register/zookeeper/zk_client.lua
b/lib/shenyu/register/zookeeper/zk_client.lua
index ceb490b..0da8366 100644
--- a/lib/shenyu/register/zookeeper/zk_client.lua
+++ b/lib/shenyu/register/zookeeper/zk_client.lua
@@ -24,22 +24,25 @@ local exiting = ngx.worker.exiting
local sleep = ngx.sleep
local strlen = string.len
local _timeout = 60 * 1000
-local _M = {}
-local mt = {__index = _M}
+local _M = {
+}
+local mt = { __index = _M }
function _M.new(self)
+ --- @type table
local conn_, err = connection:new()
if not conn_ then
return nil, "initialized connection error" .. err
end
conn_:set_timeout(_timeout)
conn_:set_keepalive()
- return setmetatable({conn = conn_}, mt)
+ return setmetatable({ conn = conn_, children_listener = {} }, mt)
end
function _M.connect(self, host)
-- body
local conn = self.conn
+ ---@type table
local iptables = util.paras_host(host, ":")
local ip = iptables[1]
local port = iptables[2]
@@ -47,10 +50,10 @@ function _M.connect(self, host)
if not byt then
return nil, "connection error" .. host
end
- local bytes, err = proto:serialize(proto.request_header,
proto.connect_request)
+ local bytes = proto:serialize(proto.request_header, proto.connect_request)
local b, err = conn:write(bytes)
if not b then
- return nil, "connect error " .. ip + ":" .. port
+ return nil, "connection error " .. ip + ":" .. port
end
local len = conn:read_len()
if not len then
@@ -62,7 +65,7 @@ function _M.connect(self, host)
end
local rsp = proto.connect_response:unpack(bytes, 1)
if not rsp then
- return nil, "error"
+ return nil, "read connection response error"
end
self.xid = 0
local t = rsp.timeout
@@ -70,10 +73,7 @@ function _M.connect(self, host)
self.ping_time = (t / 3) / 1000
self.host = host
self.session_id = rsp.session_id
- local tostring =
- "proto_ver:" ..
- rsp.proto_ver ..
- "," .. "timeout:" .. rsp.timeout .. "," .. "session_id:" ..
util.long_to_hex_string(rsp.session_id)
+ local tostring = "proto_ver:" .. rsp.proto_ver .. "," .. "timeout:" ..
rsp.timeout .. "," .. "session_id:" .. util.long_to_hex_string(rsp.session_id)
ngx_log(ngx.INFO, tostring)
return true, nil
end
@@ -100,8 +100,8 @@ function _M._get_children(self, path, is_watch)
return bytes, "write bytes error"
end
-- If other data is received, it means that the data of the _get_children
command has not been received
- ::continue::
- local rsp_header, bytes, end_index = conn:read_headler()
+ :: continue ::
+ local rsp_header, bytes, end_index = conn:read_header()
if not rsp_header then
return nil, "read headler error"
end
@@ -119,22 +119,45 @@ function _M._get_children(self, path, is_watch)
}
end
if rsp_header.xid == const.XID_PING then
- goto continue
+ goto
+ continue
end
return nil, "get_children error"
end
-function _M.add_watch(self, path)
+function _M.add_watch(self, path, listener)
-- body
local d, e = self:_get_children(path, 1)
if not d then
return d, e
end
self.watch = true
+ if not self.children_listener[path] then
+ self.children_listener[path] = listener
+ end
return d, nil
end
-local function reply_read(self, callback)
+local function watch_event(self, event)
+ if not event then
+ return
+ end
+ local type = event.type
+ local path = event.paths[1]
+ if type == const.WATCH_NODE_CHILDREN_CHANGE
+ or type == const.WATCH_NODE_CREATED
+ or type == const.WATCH_NODE_DELETED then
+ local listener = self.children_listener[path]
+ if listener then
+ local d, e = self:add_watch(path, listener)
+ if d then
+ listener(d.path)
+ end
+ end
+ end
+end
+
+local function reply_read(self)
local conn = self.conn
local h = proto.request_header
h.xid = const.XID_PING
@@ -142,29 +165,23 @@ local function reply_read(self, callback)
local req = proto:serialize(h, proto.ping_request)
local ok, err = conn:write(req)
if ok then
- local h, bytes, end_start = conn:read_headler()
+ local h, bytes, end_start = conn:read_header()
if h.xid == const.XID_PING then
ngx_log(
- ngx.DEBUG,
- "Got ping zookeeper response host:" ..
- self.host .. " for sessionId:" ..
util.long_to_hex_string(self.session_id)
+ ngx.DEBUG,
+ "Got ping zookeeper response host:" ..
+ self.host .. " for sessionId:" ..
util.long_to_hex_string(self.session_id)
)
elseif h.xid == const.XID_WATCH_EVENT then
--decoding
- local watch_event = proto.watch_event:unpack(bytes, end_start)
- -- local xid, done, err, type, state = unpack(">iliii", bytes)
- -- local eventPath = unpack_strings(strsub(bytes, 25))
- local t = watch_event.paths[1]
- local d, e = self:add_watch("" .. t)
- if d then
- callback(d.path)
- end
+ local data = proto.watch_event:unpack(bytes, end_start)
+ watch_event(self, data)
end
end
return ok, err
end
-function _M.watch_receive(self, callback)
+function _M.watch_receive(self)
local last_time = 0
while true do
if exiting() then
@@ -173,7 +190,7 @@ function _M.watch_receive(self, callback)
end
local can_ping = now() - last_time > self.ping_time
if can_ping then
- local ok, err = reply_read(self, callback)
+ local ok, err = reply_read(self)
if err then
return nil, err
end
diff --git a/lib/shenyu/register/zookeeper/zk_cluster.lua
b/lib/shenyu/register/zookeeper/zk_cluster.lua
index d13f4f3..30afc7b 100644
--- a/lib/shenyu/register/zookeeper/zk_cluster.lua
+++ b/lib/shenyu/register/zookeeper/zk_cluster.lua
@@ -30,6 +30,7 @@ function _M.connect(self)
return nil, "servers is null"
end
-- initialize
+ ---@type
local client, err = zkclient:new()
if not client then
ngx_log(ngx.ERR, "Failed to initialize zk Client" .. err)
@@ -62,23 +63,23 @@ function _M.get_children(self, path)
return data, nil
end
-local function _watch_receive(self, callback)
+local function _watch_receive(self)
local client = self.client
if not client then
ngx_log(ngx.ERR, "conn not initialized")
end
- return client:watch_receive(callback)
+ return client:watch_receive()
end
-function _M.add_watch(self, path, callback)
+function _M.add_watch(self, path, listener)
local client = self.client
if not client then
ngx_log(ngx.ERR, "conn not initialized")
end
- local data, err = client:add_watch(path)
+ local data, err = client:add_watch(path,listener)
if data then
- callback(data.path)
- return _watch_receive(self, callback)
+ listener(data.path)
+ return _watch_receive(self)
end
return data, err
end
diff --git a/lib/shenyu/register/zookeeper/zk_const.lua
b/lib/shenyu/register/zookeeper/zk_const.lua
index 2aa7e3a..d3ec42e 100644
--- a/lib/shenyu/register/zookeeper/zk_const.lua
+++ b/lib/shenyu/register/zookeeper/zk_const.lua
@@ -19,33 +19,30 @@ local _M = {}
-- XID
_M.XID_WATCH_EVENT = -1
-
_M.XID_PING = -2
-
_M.XID_SET_WATCHES = -8
-
--op code
-
_M.ZOO_GET_CHILDREN = 8
-
_M.ZOO_PING_OP = 11
-
_M.ZOO_GET_CHILDREN2 = 12
-
_M.ZOO_SET_WATCHES = 101
-
_M.ZOO_ADD_WATCH = 106
-
+--watch type
+_M.WATCH_NONE = -1
+_M.WATCH_NODE_CREATED = 1
+_M.WATCH_NODE_DELETED = 2
+_M.WATCH_NODE_DATA_CHANGE = 3
+_M.WATCH_NODE_CHILDREN_CHANGE = 4
+_M.WATCH_DATA_WATCH_REMOVE = 5
+_M.WATCH_CHILD_WATCH_REMOVE = 6
+--cus const.
_M.ZK_WATCH_PATH = "/shenyu/register/instance";
-
--Definition of error codes.
local error_code = {
"0", "-1", "-2", "-3", "-4", "-5", "-6", "-7", "-8", "-100",
"-101", "-102", "-103", "-108", "-110", "-111", "-112",
"-113", "-114", "-115", "-118", "-119" }
-
-
---errorcode
+--err message.
local error_msg = {
err0 = "Ok",
err1 = "System error",