This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu-nginx.git
The following commit(s) were added to refs/heads/main by this push:
new 85a76fe fix npe in multi-workers scenario (#6)
85a76fe is described below
commit 85a76fee4ca183854e4e203fb5ad10395bbc8e36
Author: zhc <[email protected]>
AuthorDate: Sat May 21 16:55:22 2022 +0800
fix npe in multi-workers scenario (#6)
---
example/nginx.conf | 7 +++--
lib/shenyu/register/etcd.lua | 66 ++++++++++++++++++++++++++++++++++-------
test/it/gateway/conf/nginx.conf | 2 +-
3 files changed, 61 insertions(+), 14 deletions(-)
diff --git a/example/nginx.conf b/example/nginx.conf
index 022baec..868cfcf 100644
--- a/example/nginx.conf
+++ b/example/nginx.conf
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-worker_processes 1;
+worker_processes 4;
daemon off;
error_log /dev/stdout debug;
@@ -23,13 +23,14 @@ events {
worker_connections 1024;
}
http {
- lua_shared_dict shenyu_instances 1m;
+ lua_shared_dict shenyu_storage 1m;
init_worker_by_lua_block {
local register = require("shenyu.register.etcd")
register.init({
+ shenyu_storage = ngx.shared.shenyu_storage,
balancer_type = "chash",
- etcd_base_url = "http://192.168.0.6:2379",
+ etcd_base_url = "http://192.168.0.106:2379",
})
}
diff --git a/lib/shenyu/register/etcd.lua b/lib/shenyu/register/etcd.lua
index 6f3ee21..370d647 100644
--- a/lib/shenyu/register/etcd.lua
+++ b/lib/shenyu/register/etcd.lua
@@ -119,7 +119,7 @@ local function fetch_shenyu_instances(conf)
local kvs = json.decode(res.body).kvs
if not kvs then
- return false
+ return nil, "get the empty shenyu instances from etcd."
end
local server_list = {}
@@ -137,6 +137,10 @@ local function fetch_shenyu_instances(conf)
_M.revision = _revision + 1
_M.shenyu_instances = shenyu_instances
+ local _servers = json.encode(server_list)
+ _M.storage:set("server_list", _servers)
+ _M.storage:set("revision", _M.revision)
+
_M.balancer:init(server_list)
return true
end
@@ -195,6 +199,10 @@ local function parse_watch_response(response_body)
end
log(INFO, "updated upstream nodes successful.")
+ local _servers = json.encode(server_list)
+ _M.storage:set("server_list", _servers)
+ _M.storage:set("revision", _M.revision)
+
_M.balancer:init(server_list)
_M.revision = _revision + 1
@@ -202,6 +210,35 @@ local function parse_watch_response(response_body)
end
end
+local function sync(premature)
+ if premature or ngx_worker_exiting() then
+ return
+ end
+
+ local time_at = 1
+ local storage = _M.storage
+
+ local lock = storage:get("_lock")
+ local ver = storage:get("revision")
+
+ if lock and ver > _M.revision then
+ local server_list = storage:get("server_list")
+ local servers = json.decode(server_list)
+ if _M.revision <= 1 then
+ _M.balancer:init(servers)
+ else
+ _M.balancer:reinit(servers)
+ end
+ time_at = 0
+ _M.revision = ver
+ end
+
+ local ok, err = ngx_timer_at(time_at, sync)
+ if not ok then
+ log(ERR, "failed to start sync ", err)
+ end
+end
+
local function watch(premature, watching)
if premature or ngx_worker_exiting() then
return
@@ -209,6 +246,8 @@ local function watch(premature, watching)
if not watching then
if not _M.etcd_conf then
+ _M.storage:set("_lock", false)
+
local conf, err = parse_base_url(_M.etcd_base_url)
if not conf then
log(ERR, err)
@@ -223,6 +262,7 @@ local function watch(premature, watching)
_M.time_at = 3
else
watching = true
+ _M.storage:set("_lock", true)
end
else
local conf = _M.etcd_conf
@@ -286,7 +326,7 @@ local function watch(premature, watching)
until not buffer
local ok, err = httpc:set_keepalive()
if not ok then
- ngx.say("failed to set keepalive: ", err)
+ log(ERR, "failed to set keepalive: ", err)
end
end
@@ -303,18 +343,24 @@ end
-- etcd_base_url = "http://127.0.0.1:2379",
-- }
function _M.init(conf)
- if ngx.worker.id() ~= 0 then
+ _M.storage = conf.shenyu_storage
+ _M.balancer = balancer.new(conf.balancer_type)
+
+ if ngx.worker.id() == 0 then
+ _M.shenyu_instances = {}
+ _M.etcd_base_url = conf.etcd_base_url
+
+ -- Start the etcd watcher
+ local ok, err = ngx_timer_at(0, watch)
+ if not ok then
+ log(ERR, "failed to start watch: " .. err)
+ end
return
end
- _M.shenyu_instances = {}
- _M.etcd_base_url = conf.etcd_base_url
- _M.balancer = balancer.new(conf.balancer_type)
-
- -- Start the etcd watcher
- local ok, err = ngx_timer_at(0, watch)
+ local ok, err = ngx_timer_at(2, sync)
if not ok then
- log(ERR, "failed to start watch: " .. err)
+ log(ERR, "failed to start sync ", err)
end
end
diff --git a/test/it/gateway/conf/nginx.conf b/test/it/gateway/conf/nginx.conf
index fac4fc9..dc5795f 100644
--- a/test/it/gateway/conf/nginx.conf
+++ b/test/it/gateway/conf/nginx.conf
@@ -15,7 +15,7 @@
# specific language governing permissions and limitations
# under the License.
-worker_processes 1;
+worker_processes 4;
daemon off;
error_log /dev/stdout debug;