This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu-nginx.git
The following commit(s) were added to refs/heads/main by this push:
new 675898e add etcd register (#3)
675898e is described below
commit 675898e545acf2f961e67241e2544ddbed8960ad
Author: zhc <[email protected]>
AuthorDate: Wed May 18 09:41:07 2022 +0800
add etcd register (#3)
---
example/nginx.conf | 52 ++++++
lib/shenyu/client.lua | 278 +++++++++++++++++++++++++++++
lib/shenyu/register/balancer.lua | 74 ++++++++
lib/shenyu/register/etcd.lua | 321 ++++++++++++++++++++++++++++++++++
rockspec/shenyu-nginx-main-0.rockspec | 27 +++
5 files changed, 752 insertions(+)
diff --git a/example/nginx.conf b/example/nginx.conf
new file mode 100644
index 0000000..022baec
--- /dev/null
+++ b/example/nginx.conf
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+worker_processes 1;
+daemon off;
+error_log /dev/stdout debug;
+
+events {
+ worker_connections 1024;
+}
+http {
+ lua_shared_dict shenyu_instances 1m;
+
+ init_worker_by_lua_block {
+ local register = require("shenyu.register.etcd")
+ register.init({
+ balancer_type = "chash",
+ etcd_base_url = "http://192.168.0.6:2379",
+ })
+ }
+
+ upstream shenyu {
+ server 0.0.0.1;
+ balancer_by_lua_block {
+ require("shenyu.register.etcd").pick_and_set_peer()
+ }
+ }
+
+ server {
+ listen 80;
+
+ location ~ /* {
+ proxy_pass http://shenyu;
+ }
+ }
+}
+
+
diff --git a/lib/shenyu/client.lua b/lib/shenyu/client.lua
new file mode 100644
index 0000000..a3759c4
--- /dev/null
+++ b/lib/shenyu/client.lua
@@ -0,0 +1,278 @@
+local http = require "http"
+local json = require("cjson.safe")
+local encode_base64 = ngx.encode_base64
+local decode_base64 = ngx.decode_base64
+local lrucache = require("resty.lrucache.pureffi")
+
+local ngx_time = ngx.time
+local ngx_timer_at = ngx.timer.at
+local ngx_worker_exiting = ngx.worker.exiting
+
+local re = ngx.re.match
+
+local ngx = ngx
+local log = ngx.log
+local ERR = ngx.ERR
+local WARN = ngx.WARN
+local INFO = ngx.INFO
+
+local _M = {
+ start_key = "/shenyu/register/instance/ ",
+ end_key = "/shenyu/register/instance/~",
+ revision = 0,
+}
+
+-- lua_shared_dict shenyu_server_list 1m
+-- local _M.server_list = ngx.shared.shenyu_server_list
+
+-- conf = {
+-- balance_type = "chash",
+-- shenyu_server_list = {},
+-- etcd_base_url = "http://127.0.0.1:2379",
+-- }
+function _M.init(conf)
+ if ngx.worker.id ~= 0 then
+ return
+ end
+
+ if conf.balancer_type == "chash" then
+ local balancer = require("resty.chash")
+ _M.build_upstream_servers = function(servers)
+ log(ERR, "not support yet.")
+ end
+ _M.balancer = balancer
+ else
+ local balancer = require("resty.roundrobin")
+ _M.build_upstream_servers = function(servers)
+ balancer:reinit(servers)
+ end
+ _M.balancer = balancer
+ end
+
+ -- Start the etcd watcher
+ --local ok, err = ngx_timer_at(0, watch)
+ --if not ok then
+ -- log(ERR, "failed to start watch: " .. err)
+ --end
+end
+
+
+function pick_and_set_peer()
+ local server = _M.balancer.find()
+ balancer.set_current_peer(server)
+end
+
+
+local function parse_base_url(base_url)
+ local m, err = re(base_url,
[=[([^\/]+):\/\/([\da-zA-Z.-]+|\[[\da-fA-F:]+\]):?(\d+)?(\/)?$]=], "jo")
+ if not m then
+ log(ERR, err)
+ end
+
+ local base_url = m[1] .. "://" .. m[2] .. ":" .. m[3]
+ return {
+ scheme = m[1],
+ host = m[2],
+ port = tonumber(m[3]),
+ base_url = base_url,
+ prefix = detect_etcd_version(base_url),
+ }
+end
+
+
+-- <= 3.2 /v3alpha
+-- = 3.3 /v3beta
+-- >= 3.4 /v3
+local function detect_etcd_version(base_url)
+ local httpc = http.new()
+ local res, err = httpc:request_uri(base_url .. "/version")
+ if not res then
+ log(ERR, "failed to get version from etcd server.", err)
+ end
+
+ local m
+ local response = json.decode(res.body)
+ m, err = re(response.etcdcluster, "^(\\d+)\\.(\\d+)\\.(\\d+)$")
+ if not m then
+ log(ERR, "failed to resolve etcd version.", err)
+ end
+
+ if tonumber(m[1]) ~= 3 then
+ log(ERR, "support etcd v3 only.")
+ end
+
+ local ver_minor = tonumber(m[2])
+ if ver_minor <= 2 then
+ return "/v3alpha"
+ elseif ver_minor == 3 then
+ return "/v3beta"
+ else
+ return "/v3"
+ end
+end
+
+
+local function fetch_shenyu_instances(etcd_conf, shenyu_server_list)
+ local server_list = shenyu_server_list
+
+ local range_request = {
+ key = encode_base64(start_key),
+ range_end = encode_base64(end_key),
+ }
+ local httpc = http.new()
+ local res, err = httpc.request_uri(etcd_conf.base_url .. etcd_conf.prefix ..
"/kv/range", {
+ method = "POST",
+ body = json.encode(range_request),
+ })
+
+ if not res then
+ log(ERR, "failed to list shenyu instances from etcd", err)
+ end
+
+ -- server_list = {
+ -- ["host:port"] = {
+ -- host,
+ -- port,
+ -- ...
+ -- }
+ -- }
+ local kvs = json.decode(res.body).kvs
+ for _, kv in pairs(kvs) do
+ update_revision(kv.mod_revision)
+ server_list:set(kv.key, parse_value(kv.value))
+ end
+
+ build_server_list(server_list)
+end
+
+
+local function update_revision(mode_revision, force)
+ if force and revision > mod_revision then
+ log(ERR, "failed to update revision because the revision greater than
specific")
+ return
+ end
+ revision = mod_revision
+end
+
+
+local function watch(premature, etcd_conf, watching)
+ if premature or ngx_worker_exiting() then
+ return
+ end
+
+ local httpc = http.new()
+ if not watching then
+ local etcd_conf = parse_base_url(conf.etcd_base_url)
+
+ _M.server_list = conf.shenyu_server_list
+ fetch_shenyu_instances(etcd_conf, server_list)
+ return
+ end
+
+ local ok, err = httpc:connect(etcd_conf.host, etcd_conf.port, {
+ scheme = etcd_conf.scheme,
+ })
+ if not ok then
+ -- return nil, "faliled to connect to etcd server", err
+ log(ERR, "faliled to connect to etcd server", err)
+ end
+
+ -- message WatchCreateRequest {
+ -- bytes key = 1;
+ -- bytes range_end = 2;
+ -- int64 start_revision = 3;
+ -- bool progress_notify = 4;
+ -- enum FilterType {
+ -- NOPUT = 0;
+ -- NODELETE = 1;
+ -- }
+ -- repeated FilterType filters = 5;
+ -- bool prev_kv = 6;
+ -- }
+ local request = {
+ create_request = {
+ key = encode_base64(start_key),
+ range_end = encode_base64(end_key),
+ start_revision = _M.revision,
+ }
+ }
+
+ local res, err = httpc:request({
+ path = "/v3/watch",
+ method = "POST",
+ body = json.encode(request),
+ })
+ if not res then
+ log(ERR, "failed to watch keys under '/shenyu/register/instance/'", err)
+ end
+
+ local reader = res.body_reader
+ local buffer_size = 8192
+
+ repeat
+ local buffer, err = reader(buffer_size)
+ if err then
+ if err == "timeout" then
+ ngx.log(ngx.ERROR, "============", err)
+ end
+ ngx.log(ngx.ERROR, err)
+ end
+
+ if buffer then
+ print(buffer)
+ parse_watch_response(buffer)
+ end
+ until not buffer
+
+ local ok, err = ngx_timer_at(1, watch, etcd_conf, true)
+ if not ok then
+ log(ERR, "faield start watch: ", err)
+ end
+end
+
+local function parse_watch_response(response_body)
+ -- message WatchResponse {
+ -- ResponseHeader header = 1;
+ -- int64 watch_id = 2;
+ -- bool created = 3;
+ -- bool canceled = 4;
+ -- int64 compact_revision = 5;
+
+ -- repeated mvccpb.Event events = 11;
+ -- }
+ local response = json.decode(response_body)
+ local events = response.events
+
+ -- not updated
+ if not events then
+ return
+ end
+
+ local server_list = _M.shenyu_server_list
+ -- message Event {
+ -- enum EventType {
+ -- PUT = 0;
+ -- DELETE = 1;
+ -- }
+ -- EventType type = 1;
+ -- KeyValue kv = 2;
+ -- KeyValue prev_kv = 3;
+ -- }
+ for _, event in pairs(events) do
+ local kv = event.kv
+ update_revision(kv.mod_revision, true)
+
+ -- event.type: delete
+ if event.type == 1 then
+ log(INFO, "remove shenyu server instance[" .. kv.key .. "].")
+ server_list:delete(kv.key)
+ else
+ server_list:set(kv.key, 1)
+ end
+ end
+
+ build_upstream_servers(server_list)
+end
+
+
+return _M
diff --git a/lib/shenyu/register/balancer.lua b/lib/shenyu/register/balancer.lua
new file mode 100644
index 0000000..39d37ad
--- /dev/null
+++ b/lib/shenyu/register/balancer.lua
@@ -0,0 +1,74 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local _M = {}
+local str_null = string.char(0)
+
+function _M.new(balancer_type)
+ local balancer_type = (balancer_type or "roundrobin")
+ if balancer_type == "chash" then
+ _M.init = function(self, server_list)
+ local servers, nodes = {}, {}
+ for serv, weight in pairs(server_list) do
+ local id = string.gsub(serv, ":", str_null)
+
+ servers[id] = serv
+ nodes[id] = weight
+ end
+
+ _M.balancer = require("resty.chash"):new(nodes)
+ _M.servers = servers
+ end
+
+ _M.reinit = function(self, server_list)
+ local servers, nodes = {}, {}
+ for serv, weight in pairs(server_list) do
+ local id = string.gsub(serv, ":", str_null)
+
+ servers[id] = serv
+ nodes[id] = weight
+ end
+
+ _M.balancer:reinit(nodes)
+ _M.servers = servers
+ end
+
+ _M.find = function(self, key)
+ local id = _M.balancer:find(key)
+ return _M.servers[id]
+ end
+ elseif balancer_type == "roundrobin" then
+ _M.init = function(self, servers)
+ _M.balancer = require("resty.roundrobin"):new(servers)
+ end
+
+ _M.reinit = function(self, servers)
+ _M.balancer:reinit(servers)
+ end
+
+ _M.find = function(self, key)
+ return _M.balancer:find()
+ end
+ else
+ log(ERR, "unknown balancer_type[" .. balancer_type .. "]")
+ return
+ end
+
+ return _M
+end
+
+return _M
diff --git a/lib/shenyu/register/etcd.lua b/lib/shenyu/register/etcd.lua
new file mode 100644
index 0000000..27a020d
--- /dev/null
+++ b/lib/shenyu/register/etcd.lua
@@ -0,0 +1,321 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+local _M = {}
+
+local http = require("resty.http")
+local json = require("cjson.safe")
+local ngx_balancer = require("ngx.balancer")
+
+local balancer = require("shenyu.register.balancer")
+
+local ngx = ngx
+
+local re = ngx.re.match
+local ngx_timer_at = ngx.timer.at
+local ngx_worker_exiting = ngx.worker.exiting
+local encode_base64 = ngx.encode_base64
+local decode_base64 = ngx.decode_base64
+
+local log = ngx.log
+local ERR = ngx.ERR
+local INFO = ngx.INFO
+
+_M.start_key = "/shenyu/register/instance/ "
+_M.end_key = "/shenyu/register/instance/~"
+_M.revision = -1
+_M.time_at = 0
+
+-- <= 3.2 /v3alpha
+-- == 3.3 /v3beta
+-- >= 3.4 /v3
+local function detect_etcd_version(base_url)
+ local httpc = http.new()
+ local res, err = httpc:request_uri(base_url .. "/version")
+ if not res then
+ log(ERR, "failed to get version from etcd server.", err)
+ end
+
+ local m
+ local response = json.decode(res.body)
+ m, err = re(response.etcdcluster, "^(\\d+)\\.(\\d+)\\.(\\d+)$")
+ if not m then
+ log(ERR, "failed to resolve etcd version.", err)
+ end
+
+ if tonumber(m[1]) ~= 3 then
+ log(ERR, "support etcd v3 only.")
+ end
+
+ local ver_minor = tonumber(m[2])
+ if ver_minor <= 2 then
+ return "/v3alpha"
+ elseif ver_minor == 3 then
+ return "/v3beta"
+ else
+ return "/v3"
+ end
+end
+
+local function parse_base_url(base_url)
+ local m, err = re(base_url,
[=[([^\/]+):\/\/([\da-zA-Z.-]+|\[[\da-fA-F:]+\]):?(\d+)?(\/)?$]=], "jo")
+ if not m then
+ return nil, "failed to parse etcd base_url[" .. base_url .. "], " ..
(err or "unknown")
+ end
+
+ local base_url = m[1] .. "://" .. m[2] .. ":" .. m[3]
+ return {
+ scheme = m[1],
+ host = m[2],
+ port = tonumber(m[3]),
+ base_url = base_url,
+ prefix = detect_etcd_version(base_url),
+ }
+end
+
+local function parse_value(value)
+ local obj = json.decode(decode_base64(value))
+ return obj.host .. ":" .. obj.port
+end
+
+local function fetch_shenyu_instances(conf)
+ local range_request = {
+ key = encode_base64(_M.start_key),
+ range_end = encode_base64(_M.end_key),
+ }
+
+ local httpc = http.new()
+ local res, err = httpc:request_uri(conf.base_url .. conf.prefix ..
"/kv/range", {
+ method = "POST",
+ body = json.encode(range_request),
+ })
+ if not res then
+ return nil, "failed to list shenyu instances from etcd, " .. (err or
"unknown")
+ end
+
+ if res.status >= 400 then
+ if not res.body then
+ return nil, "failed to send range_request to etcd, " .. (err or
"unknown")
+ end
+ return nil, "failed to send range_request to etcd, " .. res.body
+ end
+
+ local _revision = _M.revision
+ local shenyu_instances = _M.shenyu_instances
+
+ local server_list = {}
+ local kvs = json.decode(res.body).kvs
+
+ for _, kv in pairs(kvs) do
+ local ver = tonumber(kv.mod_revision)
+ if _revision < ver then
+ _revision = ver
+ end
+
+ local key = parse_value(kv.value)
+ server_list[key] = 1
+ shenyu_instances[kv.key] = key
+ end
+
+ _M.revision = _revision + 1
+ _M.shenyu_instances = shenyu_instances
+
+ _M.balancer:init(server_list)
+ return true
+end
+
+local function parse_watch_response(response_body)
+ -- message WatchResponse {
+ -- ResponseHeader header = 1;
+ -- int64 watch_id = 2;
+ -- bool created = 3;
+ -- bool canceled = 4;
+ -- int64 compact_revision = 5;
+
+ -- repeated mvccpb.Event events = 11;
+ -- }
+ local response = json.decode(response_body)
+ if response == nil or response.result == nil then
+ return
+ end
+ local events = response.result.events
+ if not events then
+ return
+ end
+ log(INFO, "watch response: " .. response_body)
+
+ local shenyu_instances = _M.shenyu_instances
+ -- message Event {
+ -- enum EventType {
+ -- PUT = 0;
+ -- DELETE = 1;
+ -- }
+ -- EventType type = 1;
+ -- KeyValue kv = 2;
+ -- KeyValue prev_kv = 3;
+ -- }
+
+ local _revision = _M.revision
+ for _, event in pairs(events) do
+ local kv = event.kv
+ _revision = tonumber(kv.mod_revision)
+
+ -- event.type: delete
+ if event.type == "DELETE" then
+ log(INFO, "remove upstream node of shenyu, key: " .. kv.key)
+ shenyu_instances[kv.key] = nil
+ else
+ local updated = parse_value(kv.value)
+ log(INFO, "update upstream node of shenyu: " .. updated)
+ shenyu_instances[kv.key] = update
+ end
+ end
+
+ if _M.revision < _revision then
+ local server_list = {}
+ for _, value in pairs(shenyu_instances) do
+ server_list[value] = 1
+ end
+ log(INFO, "updated upstream nodes successful.")
+
+ _M.balancer:init(server_list)
+
+ _M.revision = _revision + 1
+ _M.shenyu_instances = shenyu_instances
+ end
+end
+
+local function watch(premature, watching)
+ if premature or ngx_worker_exiting() then
+ return
+ end
+
+ if not watching then
+ local conf, err = parse_base_url(_M.etcd_base_url)
+ if not conf then
+ log(ERR, err)
+ return err
+ end
+
+ local ok, err = fetch_shenyu_instances(conf)
+ if not ok then
+ log(ERR, err)
+ _M.time_at = 3
+ else
+ watching = true
+ end
+ _M.etcd_conf = conf
+ else
+ local conf = _M.etcd_conf
+ local httpc = http.new()
+ local ok, err = httpc:connect({
+ scheme = conf.scheme,
+ host = conf.host,
+ port = tonumber(conf.port),
+ })
+ if not ok then
+ log(ERR, "failed to connect to etcd server", err)
+ _M.time_at = 3
+ end
+ -- message WatchCreateRequest {
+ -- bytes key = 1;
+ -- bytes range_end = 2;
+ -- int64 start_revision = 3;
+ -- bool progress_notify = 4;
+ -- enum FilterType {
+ -- NOPUT = 0;
+ -- NODELETE = 1;
+ -- }
+ -- repeated FilterType filters = 5;
+ -- bool prev_kv = 6;
+ -- }
+ local request = {
+ create_request = {
+ key = encode_base64(_M.start_key),
+ range_end = encode_base64(_M.end_key),
+ start_revision = _M.revision,
+ }
+ }
+
+ local res, err = httpc:request({
+ path = "/v3/watch",
+ method = "POST",
+ body = json.encode(request),
+ })
+ if not res then
+ log(ERR, "failed to watch keys under
'/shenyu/register/instance/'", err)
+ _M.time_at = 3
+ goto continue
+ end
+
+ local reader = res.body_reader
+ local buffer_size = 8192
+
+ _M.time_at = 0
+ repeat
+ local buffer, err = reader(buffer_size)
+ if err then
+ if err ~= "timeout" then
+ log(ERR, err)
+ end
+ goto continue
+ end
+
+ if buffer then
+ parse_watch_response(buffer)
+ end
+ until not buffer
+ local ok, err = httpc:set_keepalive()
+ if not ok then
+ ngx.say("failed to set keepalive: ", err)
+ end
+ end
+
+ :: continue ::
+ local ok, err = ngx_timer_at(_M.time_at, watch, watching)
+ if not ok then
+ log(ERR, "failed to start watch: ", err)
+ end
+ return
+end
+
+-- conf = {
+-- balance_type = "chash",
+-- etcd_base_url = "http://127.0.0.1:2379",
+-- }
+function _M.init(conf)
+ if ngx.worker.id() ~= 0 then
+ return
+ end
+
+ _M.shenyu_instances = {}
+ _M.etcd_base_url = conf.etcd_base_url
+ _M.balancer = balancer.new(conf.balancer_type)
+
+ -- Start the etcd watcher
+ local ok, err = ngx_timer_at(0, watch)
+ if not ok then
+ log(ERR, "failed to start watch: " .. err)
+ end
+end
+
+function _M.pick_and_set_peer(key)
+ local server = _M.balancer:find(key)
+ ngx_balancer.set_current_peer(server)
+end
+
+return _M
diff --git a/rockspec/shenyu-nginx-main-0.rockspec
b/rockspec/shenyu-nginx-main-0.rockspec
new file mode 100644
index 0000000..07f31af
--- /dev/null
+++ b/rockspec/shenyu-nginx-main-0.rockspec
@@ -0,0 +1,27 @@
+package = "shenyu-nginx"
+version = "main-0"
+source = {
+ url = "https://github.com/apache/incubator-shenyu-nginx",
+ branch = "main",
+}
+
+description = {
+ summary = "Discovery Apache Shenyu servers for Nginx",
+ homepage = "https://github.com/apache/incubator-shenyu-nginx",
+ license = "Apache License 2.0"
+}
+
+dependencies = {
+ "lua-resty-balancer >= 0.04",
+ "lua-resty-http >= 0.15",
+ "lua-cjson = 2.1.0.6-1",
+}
+
+build = {
+ type = "builtin",
+ modules = {
+ ["shenyu.register.etcd"] = "lib/shenyu/register/etcd.lua",
+ ["shenyu.register.balancer"] = "lib/shenyu/register/balancer.lua",
+ }
+}
+