This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu-nginx.git


The following commit(s) were added to refs/heads/main by this push:
     new 675898e  add etcd register (#3)
675898e is described below

commit 675898e545acf2f961e67241e2544ddbed8960ad
Author: zhc <[email protected]>
AuthorDate: Wed May 18 09:41:07 2022 +0800

    add etcd register (#3)
---
 example/nginx.conf                    |  52 ++++++
 lib/shenyu/client.lua                 | 278 +++++++++++++++++++++++++++++
 lib/shenyu/register/balancer.lua      |  74 ++++++++
 lib/shenyu/register/etcd.lua          | 321 ++++++++++++++++++++++++++++++++++
 rockspec/shenyu-nginx-main-0.rockspec |  27 +++
 5 files changed, 752 insertions(+)

diff --git a/example/nginx.conf b/example/nginx.conf
new file mode 100644
index 0000000..022baec
--- /dev/null
+++ b/example/nginx.conf
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+worker_processes  1;
+daemon off;
+error_log /dev/stdout debug;
+
+events {
+    worker_connections 1024;
+}
+http {
+    lua_shared_dict shenyu_instances 1m;
+
+    init_worker_by_lua_block {
+        local register = require("shenyu.register.etcd")
+        register.init({
+            balancer_type = "chash",
+            etcd_base_url = "http://192.168.0.6:2379";,
+        })
+    }
+
+    upstream shenyu {
+        server 0.0.0.1;
+        balancer_by_lua_block {
+            require("shenyu.register.etcd").pick_and_set_peer()
+        }
+    }
+
+    server {
+        listen 80;
+
+        location ~ /* {
+            proxy_pass http://shenyu;
+        }
+    }
+}
+
+       
diff --git a/lib/shenyu/client.lua b/lib/shenyu/client.lua
new file mode 100644
index 0000000..a3759c4
--- /dev/null
+++ b/lib/shenyu/client.lua
@@ -0,0 +1,278 @@
+local http = require "http"
+local json = require("cjson.safe")
+local encode_base64 = ngx.encode_base64
+local decode_base64 = ngx.decode_base64
+local lrucache = require("resty.lrucache.pureffi")
+
+local ngx_time = ngx.time
+local ngx_timer_at = ngx.timer.at
+local ngx_worker_exiting = ngx.worker.exiting
+
+local re = ngx.re.match
+
+local ngx = ngx
+local log = ngx.log
+local ERR = ngx.ERR
+local WARN = ngx.WARN
+local INFO = ngx.INFO
+
+local _M = {
+  start_key = "/shenyu/register/instance/ ",
+  end_key = "/shenyu/register/instance/~",
+  revision = 0,
+}
+
+-- lua_shared_dict shenyu_server_list 1m
+-- local _M.server_list = ngx.shared.shenyu_server_list
+
+-- conf = {
+--   balance_type = "chash",
+--   shenyu_server_list = {},
+--   etcd_base_url = "http://127.0.0.1:2379";,
+-- }
+function _M.init(conf)
+  if ngx.worker.id ~= 0 then
+    return
+  end
+
+  if conf.balancer_type == "chash" then
+    local balancer = require("resty.chash")
+    _M.build_upstream_servers = function(servers)
+      log(ERR, "not support yet.")
+    end
+    _M.balancer = balancer
+  else
+    local balancer = require("resty.roundrobin")
+    _M.build_upstream_servers = function(servers)
+      balancer:reinit(servers)
+    end
+    _M.balancer = balancer
+  end
+
+  -- Start the etcd watcher
+  --local ok, err = ngx_timer_at(0, watch)
+  --if not ok then
+  --    log(ERR, "failed to start watch: " .. err)
+  --end
+end
+
+
+function pick_and_set_peer()
+  local server = _M.balancer.find()
+  balancer.set_current_peer(server)
+end
+
+
+local function parse_base_url(base_url)
+  local m, err = re(base_url, 
[=[([^\/]+):\/\/([\da-zA-Z.-]+|\[[\da-fA-F:]+\]):?(\d+)?(\/)?$]=], "jo")
+  if not m then
+    log(ERR, err)
+  end
+
+  local base_url = m[1] .. "://" .. m[2] .. ":" .. m[3]
+  return {
+    scheme = m[1],
+    host = m[2],
+    port = tonumber(m[3]),
+    base_url = base_url,
+    prefix = detect_etcd_version(base_url),
+  }
+end
+
+
+-- <= 3.2 /v3alpha
+-- = 3.3 /v3beta
+-- >= 3.4 /v3
+local function detect_etcd_version(base_url)
+  local httpc = http.new()
+  local res, err = httpc:request_uri(base_url .. "/version")
+  if not res then
+    log(ERR, "failed to get version from etcd server.", err)
+  end
+
+  local m
+  local response = json.decode(res.body)
+  m, err = re(response.etcdcluster, "^(\\d+)\\.(\\d+)\\.(\\d+)$")
+  if not m then
+    log(ERR, "failed to resolve etcd version.", err)
+  end
+
+  if tonumber(m[1]) ~= 3 then
+    log(ERR, "support etcd v3 only.")
+  end
+
+  local ver_minor = tonumber(m[2])
+  if ver_minor <= 2 then
+    return "/v3alpha"
+  elseif ver_minor == 3 then
+    return "/v3beta"
+  else
+    return "/v3"
+  end
+end
+
+
+local function fetch_shenyu_instances(etcd_conf, shenyu_server_list)
+  local server_list = shenyu_server_list
+
+  local range_request = {
+    key = encode_base64(start_key),
+    range_end = encode_base64(end_key),
+  }
+  local httpc = http.new()
+  local res, err = httpc.request_uri(etcd_conf.base_url .. etcd_conf.prefix .. 
"/kv/range", {
+    method = "POST",
+    body = json.encode(range_request),
+  })
+
+  if not res then
+    log(ERR, "failed to list shenyu instances from etcd", err)
+  end
+
+  -- server_list = {
+  --   ["host:port"] = {
+  --     host,
+  --     port,
+  --     ...
+  --   }
+  -- }
+  local kvs = json.decode(res.body).kvs
+  for _, kv in pairs(kvs) do
+    update_revision(kv.mod_revision)
+    server_list:set(kv.key, parse_value(kv.value))
+  end
+
+  build_server_list(server_list)
+end
+
+
+local function update_revision(mode_revision, force)
+  if force and revision > mod_revision then
+    log(ERR, "failed to update revision because the revision greater than 
specific")
+    return
+  end
+  revision = mod_revision
+end
+
+
+local function watch(premature, etcd_conf, watching)
+  if premature or ngx_worker_exiting() then
+    return
+  end
+
+  local httpc = http.new()
+  if not watching then
+    local etcd_conf = parse_base_url(conf.etcd_base_url)
+
+    _M.server_list = conf.shenyu_server_list
+    fetch_shenyu_instances(etcd_conf, server_list)
+    return
+  end
+
+  local ok, err = httpc:connect(etcd_conf.host, etcd_conf.port, {
+    scheme = etcd_conf.scheme,
+  })
+  if not ok then
+    -- return nil, "faliled to connect to etcd server", err
+    log(ERR, "faliled to connect to etcd server", err)
+  end
+
+  -- message WatchCreateRequest {
+  --   bytes key = 1;
+  --   bytes range_end = 2;
+  --   int64 start_revision = 3;
+  --   bool progress_notify = 4;
+  --   enum FilterType {
+  --     NOPUT = 0;
+  --     NODELETE = 1;
+  --   }
+  --   repeated FilterType filters = 5;
+  --   bool prev_kv = 6;
+  -- }
+  local request = {
+    create_request = {
+      key = encode_base64(start_key),
+      range_end = encode_base64(end_key),
+      start_revision = _M.revision,
+    }
+  }
+
+  local res, err = httpc:request({
+    path = "/v3/watch",
+    method = "POST",
+    body = json.encode(request),
+  })
+  if not res then
+    log(ERR, "failed to watch keys under '/shenyu/register/instance/'", err)
+  end
+
+  local reader = res.body_reader
+  local buffer_size = 8192
+
+  repeat
+       local buffer, err = reader(buffer_size)
+       if err then
+      if err == "timeout" then
+        ngx.log(ngx.ERROR, "============", err)
+      end
+      ngx.log(ngx.ERROR, err)
+       end
+
+       if buffer then
+      print(buffer)
+      parse_watch_response(buffer)
+    end
+  until not buffer
+
+  local ok, err = ngx_timer_at(1, watch, etcd_conf, true)
+  if not ok then
+      log(ERR, "faield start watch: ", err)
+  end
+end
+
+local function parse_watch_response(response_body)
+  -- message WatchResponse {
+  --   ResponseHeader header = 1;
+  --   int64 watch_id = 2;
+  --   bool created = 3;
+  --   bool canceled = 4;
+  --   int64 compact_revision = 5;
+
+  --   repeated mvccpb.Event events = 11;
+  -- }
+  local response = json.decode(response_body)
+  local events = response.events
+
+  -- not updated
+  if not events then
+    return
+  end
+
+  local server_list = _M.shenyu_server_list
+  -- message Event {
+  --   enum EventType {
+  --     PUT = 0;
+  --     DELETE = 1;
+  --   }
+  --   EventType type = 1;
+  --   KeyValue kv = 2;
+  --   KeyValue prev_kv = 3;
+  -- }
+  for _, event in pairs(events) do
+    local kv = event.kv
+    update_revision(kv.mod_revision, true)
+
+    -- event.type: delete
+    if event.type == 1 then
+      log(INFO, "remove shenyu server instance[" .. kv.key .. "].")
+      server_list:delete(kv.key)
+    else
+      server_list:set(kv.key, 1)
+    end
+  end
+
+  build_upstream_servers(server_list)
+end
+
+
+return _M
diff --git a/lib/shenyu/register/balancer.lua b/lib/shenyu/register/balancer.lua
new file mode 100644
index 0000000..39d37ad
--- /dev/null
+++ b/lib/shenyu/register/balancer.lua
@@ -0,0 +1,74 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--    http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local _M = {}
+local str_null = string.char(0)
+
+function _M.new(balancer_type)
+    local balancer_type = (balancer_type or "roundrobin")
+    if balancer_type == "chash" then
+        _M.init = function(self, server_list)
+            local servers, nodes = {}, {}
+            for serv, weight in pairs(server_list) do
+                local id = string.gsub(serv, ":", str_null)
+
+                servers[id] = serv
+                nodes[id] = weight
+            end
+
+            _M.balancer = require("resty.chash"):new(nodes)
+            _M.servers = servers
+        end
+
+        _M.reinit = function(self, server_list)
+            local servers, nodes = {}, {}
+            for serv, weight in pairs(server_list) do
+                local id = string.gsub(serv, ":", str_null)
+
+                servers[id] = serv
+                nodes[id] = weight
+            end
+
+            _M.balancer:reinit(nodes)
+            _M.servers = servers
+        end
+
+        _M.find = function(self, key)
+            local id = _M.balancer:find(key)
+            return _M.servers[id]
+        end
+    elseif balancer_type == "roundrobin" then
+        _M.init = function(self, servers)
+            _M.balancer = require("resty.roundrobin"):new(servers)
+        end
+
+        _M.reinit = function(self, servers)
+            _M.balancer:reinit(servers)
+        end
+
+        _M.find = function(self, key)
+            return _M.balancer:find()
+        end
+    else
+        log(ERR, "unknown balancer_type[" .. balancer_type .. "]")
+        return
+    end
+
+    return _M
+end
+
+return _M
diff --git a/lib/shenyu/register/etcd.lua b/lib/shenyu/register/etcd.lua
new file mode 100644
index 0000000..27a020d
--- /dev/null
+++ b/lib/shenyu/register/etcd.lua
@@ -0,0 +1,321 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--    http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local _M = {}
+
+local http = require("resty.http")
+local json = require("cjson.safe")
+local ngx_balancer = require("ngx.balancer")
+
+local balancer = require("shenyu.register.balancer")
+
+local ngx = ngx
+
+local re = ngx.re.match
+local ngx_timer_at = ngx.timer.at
+local ngx_worker_exiting = ngx.worker.exiting
+local encode_base64 = ngx.encode_base64
+local decode_base64 = ngx.decode_base64
+
+local log = ngx.log
+local ERR = ngx.ERR
+local INFO = ngx.INFO
+
+_M.start_key = "/shenyu/register/instance/ "
+_M.end_key = "/shenyu/register/instance/~"
+_M.revision = -1
+_M.time_at = 0
+
+-- <= 3.2 /v3alpha
+-- == 3.3 /v3beta
+-- >= 3.4 /v3
+local function detect_etcd_version(base_url)
+    local httpc = http.new()
+    local res, err = httpc:request_uri(base_url .. "/version")
+    if not res then
+        log(ERR, "failed to get version from etcd server.", err)
+    end
+
+    local m
+    local response = json.decode(res.body)
+    m, err = re(response.etcdcluster, "^(\\d+)\\.(\\d+)\\.(\\d+)$")
+    if not m then
+        log(ERR, "failed to resolve etcd version.", err)
+    end
+
+    if tonumber(m[1]) ~= 3 then
+        log(ERR, "support etcd v3 only.")
+    end
+
+    local ver_minor = tonumber(m[2])
+    if ver_minor <= 2 then
+        return "/v3alpha"
+    elseif ver_minor == 3 then
+        return "/v3beta"
+    else
+        return "/v3"
+    end
+end
+
+local function parse_base_url(base_url)
+    local m, err = re(base_url, 
[=[([^\/]+):\/\/([\da-zA-Z.-]+|\[[\da-fA-F:]+\]):?(\d+)?(\/)?$]=], "jo")
+    if not m then
+        return nil, "failed to parse etcd base_url[" .. base_url .. "], " .. 
(err or "unknown")
+    end
+
+    local base_url = m[1] .. "://" .. m[2] .. ":" .. m[3]
+    return {
+        scheme = m[1],
+        host = m[2],
+        port = tonumber(m[3]),
+        base_url = base_url,
+        prefix = detect_etcd_version(base_url),
+    }
+end
+
+local function parse_value(value)
+    local obj = json.decode(decode_base64(value))
+    return obj.host .. ":" .. obj.port
+end
+
+local function fetch_shenyu_instances(conf)
+    local range_request = {
+        key = encode_base64(_M.start_key),
+        range_end = encode_base64(_M.end_key),
+    }
+
+    local httpc = http.new()
+    local res, err = httpc:request_uri(conf.base_url .. conf.prefix .. 
"/kv/range", {
+        method = "POST",
+        body = json.encode(range_request),
+    })
+    if not res then
+        return nil, "failed to list shenyu instances from etcd, " .. (err or 
"unknown")
+    end
+
+    if res.status >= 400 then
+        if not res.body then
+            return nil, "failed to send range_request to etcd, " .. (err or 
"unknown")
+        end
+        return nil, "failed to send range_request to etcd, " .. res.body
+    end
+
+    local _revision = _M.revision
+    local shenyu_instances = _M.shenyu_instances
+
+    local server_list = {}
+    local kvs = json.decode(res.body).kvs
+
+    for _, kv in pairs(kvs) do
+        local ver = tonumber(kv.mod_revision)
+        if _revision < ver then
+            _revision = ver
+        end
+
+        local key = parse_value(kv.value)
+        server_list[key] = 1
+        shenyu_instances[kv.key] = key
+    end
+
+    _M.revision = _revision + 1
+    _M.shenyu_instances = shenyu_instances
+
+    _M.balancer:init(server_list)
+    return true
+end
+
+local function parse_watch_response(response_body)
+    -- message WatchResponse {
+    --   ResponseHeader header = 1;
+    --   int64 watch_id = 2;
+    --   bool created = 3;
+    --   bool canceled = 4;
+    --   int64 compact_revision = 5;
+
+    --   repeated mvccpb.Event events = 11;
+    -- }
+    local response = json.decode(response_body)
+    if response == nil or response.result == nil then
+        return
+    end
+    local events = response.result.events
+    if not events then
+        return
+    end
+    log(INFO, "watch response: " .. response_body)
+
+    local shenyu_instances = _M.shenyu_instances
+    -- message Event {
+    --   enum EventType {
+    --     PUT = 0;
+    --     DELETE = 1;
+    --   }
+    --   EventType type = 1;
+    --   KeyValue kv = 2;
+    --   KeyValue prev_kv = 3;
+    -- }
+
+    local _revision = _M.revision
+    for _, event in pairs(events) do
+        local kv = event.kv
+        _revision = tonumber(kv.mod_revision)
+
+        -- event.type: delete
+        if event.type == "DELETE" then
+            log(INFO, "remove upstream node of shenyu, key: " .. kv.key)
+            shenyu_instances[kv.key] = nil
+        else
+            local updated = parse_value(kv.value)
+            log(INFO, "update upstream node of shenyu: " .. updated)
+            shenyu_instances[kv.key] = update
+        end
+    end
+
+    if _M.revision < _revision then
+        local server_list = {}
+        for _, value in pairs(shenyu_instances) do
+            server_list[value] = 1
+        end
+        log(INFO, "updated upstream nodes successful.")
+
+        _M.balancer:init(server_list)
+
+        _M.revision = _revision + 1
+        _M.shenyu_instances = shenyu_instances
+    end
+end
+
+local function watch(premature, watching)
+    if premature or ngx_worker_exiting() then
+        return
+    end
+
+    if not watching then
+        local conf, err = parse_base_url(_M.etcd_base_url)
+        if not conf then
+            log(ERR, err)
+            return err
+        end
+
+        local ok, err = fetch_shenyu_instances(conf)
+        if not ok then
+            log(ERR, err)
+            _M.time_at = 3
+        else
+            watching = true
+        end
+        _M.etcd_conf = conf
+    else
+        local conf = _M.etcd_conf
+        local httpc = http.new()
+        local ok, err = httpc:connect({
+            scheme = conf.scheme,
+            host = conf.host,
+            port = tonumber(conf.port),
+        })
+        if not ok then
+            log(ERR, "failed to connect to etcd server", err)
+            _M.time_at = 3
+        end
+        -- message WatchCreateRequest {
+        --   bytes key = 1;
+        --   bytes range_end = 2;
+        --   int64 start_revision = 3;
+        --   bool progress_notify = 4;
+        --   enum FilterType {
+        --     NOPUT = 0;
+        --     NODELETE = 1;
+        --   }
+        --   repeated FilterType filters = 5;
+        --   bool prev_kv = 6;
+        -- }
+        local request = {
+            create_request = {
+                key = encode_base64(_M.start_key),
+                range_end = encode_base64(_M.end_key),
+                start_revision = _M.revision,
+            }
+        }
+
+        local res, err = httpc:request({
+            path = "/v3/watch",
+            method = "POST",
+            body = json.encode(request),
+        })
+        if not res then
+            log(ERR, "failed to watch keys under 
'/shenyu/register/instance/'", err)
+            _M.time_at = 3
+            goto continue
+        end
+
+        local reader = res.body_reader
+        local buffer_size = 8192
+
+        _M.time_at = 0
+        repeat
+            local buffer, err = reader(buffer_size)
+            if err then
+                if err ~= "timeout" then
+                    log(ERR, err)
+                end
+                goto continue
+            end
+
+            if buffer then
+                parse_watch_response(buffer)
+            end
+        until not buffer
+        local ok, err = httpc:set_keepalive()
+        if not ok then
+            ngx.say("failed to set keepalive: ", err)
+        end
+    end
+
+    :: continue ::
+    local ok, err = ngx_timer_at(_M.time_at, watch, watching)
+    if not ok then
+        log(ERR, "failed to start watch: ", err)
+    end
+    return
+end
+
+-- conf = {
+--   balance_type = "chash",
+--   etcd_base_url = "http://127.0.0.1:2379";,
+-- }
+function _M.init(conf)
+    if ngx.worker.id() ~= 0 then
+        return
+    end
+
+    _M.shenyu_instances = {}
+    _M.etcd_base_url = conf.etcd_base_url
+    _M.balancer = balancer.new(conf.balancer_type)
+
+    -- Start the etcd watcher
+    local ok, err = ngx_timer_at(0, watch)
+    if not ok then
+        log(ERR, "failed to start watch: " .. err)
+    end
+end
+
+function _M.pick_and_set_peer(key)
+    local server = _M.balancer:find(key)
+    ngx_balancer.set_current_peer(server)
+end
+
+return _M
diff --git a/rockspec/shenyu-nginx-main-0.rockspec 
b/rockspec/shenyu-nginx-main-0.rockspec
new file mode 100644
index 0000000..07f31af
--- /dev/null
+++ b/rockspec/shenyu-nginx-main-0.rockspec
@@ -0,0 +1,27 @@
+package = "shenyu-nginx"
+version = "main-0"
+source = {
+   url = "https://github.com/apache/incubator-shenyu-nginx";,
+   branch = "main",
+}
+
+description = {
+   summary = "Discovery Apache Shenyu servers for Nginx",
+   homepage = "https://github.com/apache/incubator-shenyu-nginx";,
+   license = "Apache License 2.0"
+}
+
+dependencies = {
+    "lua-resty-balancer >= 0.04",
+    "lua-resty-http >= 0.15",
+    "lua-cjson = 2.1.0.6-1",
+}
+
+build = {
+   type = "builtin",
+   modules = {
+      ["shenyu.register.etcd"] = "lib/shenyu/register/etcd.lua",
+      ["shenyu.register.balancer"] = "lib/shenyu/register/balancer.lua",
+   }
+}
+

Reply via email to