This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu-nginx.git


The following commit(s) were added to refs/heads/main by this push:
     new 85a76fe  fix npe in multi-workers scenario (#6)
85a76fe is described below

commit 85a76fee4ca183854e4e203fb5ad10395bbc8e36
Author: zhc <[email protected]>
AuthorDate: Sat May 21 16:55:22 2022 +0800

    fix npe in multi-workers scenario (#6)
---
 example/nginx.conf              |  7 +++--
 lib/shenyu/register/etcd.lua    | 66 ++++++++++++++++++++++++++++++++++-------
 test/it/gateway/conf/nginx.conf |  2 +-
 3 files changed, 61 insertions(+), 14 deletions(-)

diff --git a/example/nginx.conf b/example/nginx.conf
index 022baec..868cfcf 100644
--- a/example/nginx.conf
+++ b/example/nginx.conf
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-worker_processes  1;
+worker_processes  4;
 daemon off;
 error_log /dev/stdout debug;
 
@@ -23,13 +23,14 @@ events {
     worker_connections 1024;
 }
 http {
-    lua_shared_dict shenyu_instances 1m;
+    lua_shared_dict shenyu_storage 1m;
 
     init_worker_by_lua_block {
         local register = require("shenyu.register.etcd")
         register.init({
+            shenyu_storage = ngx.shared.shenyu_storage,
             balancer_type = "chash",
-            etcd_base_url = "http://192.168.0.6:2379";,
+            etcd_base_url = "http://192.168.0.106:2379";,
         })
     }
 
diff --git a/lib/shenyu/register/etcd.lua b/lib/shenyu/register/etcd.lua
index 6f3ee21..370d647 100644
--- a/lib/shenyu/register/etcd.lua
+++ b/lib/shenyu/register/etcd.lua
@@ -119,7 +119,7 @@ local function fetch_shenyu_instances(conf)
 
     local kvs = json.decode(res.body).kvs
     if not kvs then
-        return false
+        return nil, "get the empty shenyu instances from etcd."
     end
 
     local server_list = {}
@@ -137,6 +137,10 @@ local function fetch_shenyu_instances(conf)
     _M.revision = _revision + 1
     _M.shenyu_instances = shenyu_instances
 
+    local _servers = json.encode(server_list)
+    _M.storage:set("server_list", _servers)
+    _M.storage:set("revision", _M.revision)
+
     _M.balancer:init(server_list)
     return true
 end
@@ -195,6 +199,10 @@ local function parse_watch_response(response_body)
         end
         log(INFO, "updated upstream nodes successful.")
 
+        local _servers = json.encode(server_list)
+        _M.storage:set("server_list", _servers)
+        _M.storage:set("revision", _M.revision)
+
         _M.balancer:init(server_list)
 
         _M.revision = _revision + 1
@@ -202,6 +210,35 @@ local function parse_watch_response(response_body)
     end
 end
 
+local function sync(premature)
+    if premature or ngx_worker_exiting() then
+        return
+    end
+
+    local time_at = 1
+    local storage = _M.storage
+
+    local lock = storage:get("_lock")
+    local ver = storage:get("revision")
+
+    if lock and ver > _M.revision then
+        local server_list = storage:get("server_list")
+        local servers = json.decode(server_list)
+        if _M.revision <= 1 then
+            _M.balancer:init(servers)
+        else
+            _M.balancer:reinit(servers)
+        end
+        time_at = 0
+        _M.revision = ver
+    end
+
+    local ok, err = ngx_timer_at(time_at, sync)
+    if not ok then
+        log(ERR, "failed to start sync ", err)
+    end
+end
+
 local function watch(premature, watching)
     if premature or ngx_worker_exiting() then
         return
@@ -209,6 +246,8 @@ local function watch(premature, watching)
 
     if not watching then
         if not _M.etcd_conf then
+            _M.storage:set("_lock", false)
+
             local conf, err = parse_base_url(_M.etcd_base_url)
             if not conf then
                 log(ERR, err)
@@ -223,6 +262,7 @@ local function watch(premature, watching)
             _M.time_at = 3
         else
             watching = true
+            _M.storage:set("_lock", true)
         end
     else
         local conf = _M.etcd_conf
@@ -286,7 +326,7 @@ local function watch(premature, watching)
         until not buffer
         local ok, err = httpc:set_keepalive()
         if not ok then
-            ngx.say("failed to set keepalive: ", err)
+            log(ERR, "failed to set keepalive: ", err)
         end
     end
 
@@ -303,18 +343,24 @@ end
 --   etcd_base_url = "http://127.0.0.1:2379";,
 -- }
 function _M.init(conf)
-    if ngx.worker.id() ~= 0 then
+    _M.storage = conf.shenyu_storage
+    _M.balancer = balancer.new(conf.balancer_type)
+
+    if ngx.worker.id() == 0 then
+        _M.shenyu_instances = {}
+        _M.etcd_base_url = conf.etcd_base_url
+
+        -- Start the etcd watcher
+        local ok, err = ngx_timer_at(0, watch)
+        if not ok then
+            log(ERR, "failed to start watch: " .. err)
+        end
         return
     end
 
-    _M.shenyu_instances = {}
-    _M.etcd_base_url = conf.etcd_base_url
-    _M.balancer = balancer.new(conf.balancer_type)
-
-    -- Start the etcd watcher
-    local ok, err = ngx_timer_at(0, watch)
+    local ok, err = ngx_timer_at(2, sync)
     if not ok then
-        log(ERR, "failed to start watch: " .. err)
+        log(ERR, "failed to start sync ", err)
     end
 end
 
diff --git a/test/it/gateway/conf/nginx.conf b/test/it/gateway/conf/nginx.conf
index fac4fc9..dc5795f 100644
--- a/test/it/gateway/conf/nginx.conf
+++ b/test/it/gateway/conf/nginx.conf
@@ -15,7 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
-worker_processes  1;
+worker_processes  4;
 daemon off;
 error_log /dev/stdout debug;
 

Reply via email to