This is an automated email from the ASF dual-hosted git repository.

monkeydluffy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cd71cea0 fix(consul): support to fetch only health endpoint (#9204)
2cd71cea0 is described below

commit 2cd71cea020b64864f295d22bf841ee127840f3b
Author: fabriceli <[email protected]>
AuthorDate: Thu May 4 22:55:46 2023 +0800

    fix(consul): support to fetch only health endpoint (#9204)
---
 apisix/discovery/consul/init.lua | 369 +++++++++++++++++++++++++++++++--------
 t/discovery/consul.t             |  75 ++++++++
 t/discovery/consul_dump.t        |  52 ++++++
 3 files changed, 423 insertions(+), 73 deletions(-)

diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua
index 686ab4120..ae1e4c64c 100644
--- a/apisix/discovery/consul/init.lua
+++ b/apisix/discovery/consul/init.lua
@@ -32,6 +32,14 @@ local ngx_timer_every    = ngx.timer.every
 local log                = core.log
 local json_delay_encode  = core.json.delay_encode
 local ngx_worker_id      = ngx.worker.id
+local thread_spawn       = ngx.thread.spawn
+local thread_wait        = ngx.thread.wait
+local thread_kill        = ngx.thread.kill
+local math_random        = math.random
+local pcall              = pcall
+local null               = ngx.null
+local type               = type
+local next               = next
 
 local all_services = core.table.new(0, 5)
 local default_service
@@ -44,9 +52,15 @@ local events_list
 local consul_services
 
 local default_skip_services = {"consul"}
+local default_random_range = 5
+local default_catalog_error_index = -1
+local default_health_error_index = -2
+local watch_type_catalog = 1
+local watch_type_health = 2
+local max_retry_time = 256
 
 local _M = {
-    version = 0.2,
+    version = 0.3,
 }
 
 
@@ -128,7 +142,7 @@ local function read_dump_services()
     local now_time = ngx.time()
     log.info("dump file last_update: ", entity.last_update, ", 
dump_params.expire: ",
         dump_params.expire, ", now_time: ", now_time)
-    if dump_params.expire ~= 0  and (entity.last_update + dump_params.expire) 
< now_time then
+    if dump_params.expire ~= 0 and (entity.last_update + dump_params.expire) < 
now_time then
         log.warn("dump file: ", dump_params.path, " had expired, ignored it")
         return
     end
@@ -145,7 +159,7 @@ local function write_dump_services()
         expire = dump_params.expire, -- later need handle it
     }
     local data = core.json.encode(entity)
-    local succ, err =  util.write_file(dump_params.path, data)
+    local succ, err = util.write_file(dump_params.path, data)
     if not succ then
         log.error("write dump into file got error: ", err)
     end
@@ -165,8 +179,9 @@ local function show_dump_file()
     return 200, data
 end
 
+
 local function get_retry_delay(retry_delay)
-    if not retry_delay then
+    if not retry_delay or retry_delay >= max_retry_time then
         retry_delay = 1
     else
         retry_delay = retry_delay * 4
@@ -176,80 +191,307 @@ local function get_retry_delay(retry_delay)
 end
 
 
+local function get_opts(consul_server, is_catalog)
+    local opts = {
+        host = consul_server.host,
+        port = consul_server.port,
+        connect_timeout = consul_server.connect_timeout,
+        read_timeout = consul_server.read_timeout,
+    }
+    if not consul_server.keepalive then
+        return opts
+    end
+
+    if is_catalog then
+        opts.default_args = {
+            wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by 
wait=0
+            index = consul_server.catalog_index,
+        }
+    else
+        opts.default_args = {
+            wait = consul_server.wait_timeout, --blocked wait!=0; unblocked by 
wait=0
+            index = consul_server.health_index,
+        }
+    end
+
+    return opts
+end
+
+
+local function watch_catalog(consul_server)
+    local client = resty_consul:new(get_opts(consul_server, true))
+
+    ::RETRY::
+    local watch_result, watch_err = 
client:get(consul_server.consul_watch_catalog_url)
+    local watch_error_info = (watch_err ~= nil and watch_err)
+                             or ((watch_result ~= nil and watch_result.status 
~= 200)
+                             and watch_result.status)
+    if watch_error_info then
+        log.error("connect consul: ", consul_server.consul_server_url,
+            " by sub url: ", consul_server.consul_watch_catalog_url,
+            ", got watch result: ", json_delay_encode(watch_result),
+            ", with error: ", watch_error_info)
+
+        return watch_type_catalog, default_catalog_error_index
+    end
+
+    if consul_server.catalog_index > 0
+            and consul_server.catalog_index == 
tonumber(watch_result.headers['X-Consul-Index']) then
+        local random_delay = math_random(default_random_range)
+        log.info("watch catalog has no change, re-watch consul after ", 
random_delay, " seconds")
+        core_sleep(random_delay)
+        goto RETRY
+    end
+
+    return watch_type_catalog, watch_result.headers['X-Consul-Index']
+end
+
+
+local function watch_health(consul_server)
+    local client = resty_consul:new(get_opts(consul_server, false))
+
+    ::RETRY::
+    local watch_result, watch_err = 
client:get(consul_server.consul_watch_health_url)
+    local watch_error_info = (watch_err ~= nil and watch_err)
+            or ((watch_result ~= nil and watch_result.status ~= 200)
+            and watch_result.status)
+    if watch_error_info then
+        log.error("connect consul: ", consul_server.consul_server_url,
+            " by sub url: ", consul_server.consul_watch_health_url,
+            ", got watch result: ", json_delay_encode(watch_result),
+            ", with error: ", watch_error_info)
+
+        return watch_type_health, default_health_error_index
+    end
+
+    if consul_server.health_index > 0
+            and consul_server.health_index == 
tonumber(watch_result.headers['X-Consul-Index']) then
+        local random_delay = math_random(default_random_range)
+        log.info("watch health has no change, re-watch consul after ", 
random_delay, " seconds")
+        core_sleep(random_delay)
+        goto RETRY
+    end
+
+    return watch_type_health, watch_result.headers['X-Consul-Index']
+end
+
+
+local function check_keepalive(consul_server, retry_delay)
+    if consul_server.keepalive then
+        local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
+        if not ok then
+            log.error("create ngx_timer_at got error: ", err)
+            return
+        end
+    end
+end
+
+
+local function update_index(consul_server, catalog_index, health_index)
+    local c_index = 0
+    local h_index = 0
+    if catalog_index ~= nil then
+        c_index = tonumber(catalog_index)
+    end
+
+    if health_index ~= nil then
+        h_index = tonumber(health_index)
+    end
+
+    if c_index > 0 then
+        consul_server.catalog_index = c_index
+    end
+
+    if h_index > 0 then
+        consul_server.health_index = h_index
+    end
+end
+
+
+local function is_not_empty(value)
+    if value == nil or value == null
+            or (type(value) == "table" and not next(value))
+            or (type(value) == "string" and value == "")
+    then
+        return false
+    end
+
+    return true
+end
+
+
+local function watch_result_is_valid(watch_type, index, catalog_index, 
health_index)
+    if index <= 0 then
+        return false
+    end
+
+    if watch_type == watch_type_catalog then
+        if index == catalog_index then
+            return false
+        end
+    else
+        if index == health_index then
+            return false
+        end
+    end
+
+    return true
+end
+
+
 function _M.connect(premature, consul_server, retry_delay)
     if premature then
         return
     end
 
+    local catalog_thread, spawn_catalog_err = thread_spawn(watch_catalog, 
consul_server)
+    if not catalog_thread then
+        local random_delay = math_random(default_random_range)
+        log.error("failed to spawn thread watch catalog: ", spawn_catalog_err,
+            ", retry connecting consul after ", random_delay, " seconds")
+        core_sleep(random_delay)
+
+        check_keepalive(consul_server, retry_delay)
+        return
+    end
+
+    local health_thread, err = thread_spawn(watch_health, consul_server)
+    if not health_thread then
+        thread_kill(catalog_thread)
+        local random_delay = math_random(default_random_range)
+        log.error("failed to spawn thread watch health: ", err, ", retry 
connecting consul after ",
+            random_delay, " seconds")
+        core_sleep(random_delay)
+
+        check_keepalive(consul_server, retry_delay)
+        return
+    end
+
+    local thread_wait_ok, watch_type, index = thread_wait(catalog_thread, 
health_thread)
+    thread_kill(catalog_thread)
+    thread_kill(health_thread)
+    if not thread_wait_ok then
+        local random_delay = math_random(default_random_range)
+        log.error("failed to wait thread: ", watch_type, ", retry connecting 
consul after ",
+                random_delay, " seconds")
+        core_sleep(random_delay)
+
+        check_keepalive(consul_server, retry_delay)
+        return
+    end
+
+    -- double check index has changed
+    if not watch_result_is_valid(tonumber(watch_type),
+            tonumber(index), consul_server.catalog_index, 
consul_server.health_index) then
+        retry_delay = get_retry_delay(retry_delay)
+        log.warn("get all svcs got err, retry connecting consul after ", 
retry_delay, " seconds")
+        core_sleep(retry_delay)
+
+        check_keepalive(consul_server, retry_delay)
+        return
+    end
+
     local consul_client = resty_consul:new({
         host = consul_server.host,
         port = consul_server.port,
         connect_timeout = consul_server.connect_timeout,
         read_timeout = consul_server.read_timeout,
-        default_args = consul_server.default_args,
     })
+    local catalog_success, catalog_res, catalog_err = pcall(function()
+        return consul_client:get(consul_server.consul_watch_catalog_url)
+    end)
+    if not catalog_success then
+        log.error("connect consul: ", consul_server.consul_server_url,
+            " by sub url: ", consul_server.consul_watch_catalog_url,
+            ", got catalog result: ", json_delay_encode(catalog_res))
+        check_keepalive(consul_server, retry_delay)
+        return
+    end
+    local catalog_error_info = (catalog_err ~= nil and catalog_err)
+            or ((catalog_res ~= nil and catalog_res.status ~= 200)
+            and catalog_res.status)
+    if catalog_error_info then
+        log.error("connect consul: ", consul_server.consul_server_url,
+            " by sub url: ", consul_server.consul_watch_catalog_url,
+            ", got catalog result: ", json_delay_encode(catalog_res),
+            ", with error: ", catalog_error_info)
 
-    log.info("consul_server: ", json_delay_encode(consul_server, true))
-    local watch_result, watch_err = 
consul_client:get(consul_server.consul_watch_sub_url)
-    local watch_error_info = (watch_err ~= nil and watch_err)
-            or ((watch_result ~= nil and watch_result.status ~= 200)
-            and watch_result.status)
-    if watch_error_info then
+        retry_delay = get_retry_delay(retry_delay)
+        log.warn("get all svcs got err, retry connecting consul after ", 
retry_delay, " seconds")
+        core_sleep(retry_delay)
+
+        check_keepalive(consul_server, retry_delay)
+        return
+    end
+
+    -- get health index
+    local success, health_res, health_err = pcall(function()
+        return consul_client:get(consul_server.consul_watch_health_url)
+    end)
+    if not success then
         log.error("connect consul: ", consul_server.consul_server_url,
-            " by sub url: ", consul_server.consul_watch_sub_url,
-            ", got watch result: ", json_delay_encode(watch_result, true),
-            ", with error: ", watch_error_info)
+            " by sub url: ", consul_server.consul_watch_health_url,
+            ", got health result: ", json_delay_encode(health_res))
+        check_keepalive(consul_server, retry_delay)
+        return
+    end
+    local health_error_info = (health_err ~= nil and health_err)
+            or ((health_res ~= nil and health_res.status ~= 200)
+            and health_res.status)
+    if health_error_info then
+        log.error("connect consul: ", consul_server.consul_server_url,
+            " by sub url: ", consul_server.consul_watch_health_url,
+            ", got health result: ", json_delay_encode(health_res),
+            ", with error: ", health_error_info)
 
         retry_delay = get_retry_delay(retry_delay)
-        log.warn("retry connecting consul after ", retry_delay, " seconds")
+        log.warn("get all svcs got err, retry connecting consul after ", 
retry_delay, " seconds")
         core_sleep(retry_delay)
 
-        goto ERR
+        check_keepalive(consul_server, retry_delay)
+        return
     end
 
     log.info("connect consul: ", consul_server.consul_server_url,
-        ", watch_result status: ", watch_result.status,
-        ", watch_result.headers.index: ", 
watch_result.headers['X-Consul-Index'],
+        ", catalog_result status: ", catalog_res.status,
+        ", catalog_result.headers.index: ", 
catalog_res.headers['X-Consul-Index'],
         ", consul_server.index: ", consul_server.index,
-        ", consul_server: ", json_delay_encode(consul_server, true))
-
-    -- if current index different last index then update service
-    if consul_server.index ~= watch_result.headers['X-Consul-Index'] then
-        local up_services = core.table.new(0, #watch_result.body)
-        local consul_client_svc = resty_consul:new({
-            host = consul_server.host,
-            port = consul_server.port,
-            connect_timeout = consul_server.connect_timeout,
-            read_timeout = consul_server.read_timeout,
-        })
-        for service_name, _ in pairs(watch_result.body) do
+        ", consul_server: ", json_delay_encode(consul_server))
+
+    -- if the current index is different from the last index, then update the 
service
+    if (consul_server.catalog_index ~= 
tonumber(catalog_res.headers['X-Consul-Index']))
+            or (consul_server.health_index ~= 
tonumber(health_res.headers['X-Consul-Index'])) then
+        local up_services = core.table.new(0, #catalog_res.body)
+        for service_name, _ in pairs(catalog_res.body) do
             -- check if the service_name is 'skip service'
             if skip_service_map[service_name] then
                 goto CONTINUE
             end
+
             -- get node from service
             local svc_url = consul_server.consul_sub_url .. "/" .. service_name
-            local result, err = consul_client_svc:get(svc_url)
-            local error_info = (err ~= nil and err) or
+            local svc_success, result, get_err = pcall(function()
+                return consul_client:get(svc_url, {passing = true})
+            end)
+            local error_info = (get_err ~= nil and get_err) or
                     ((result ~= nil and result.status ~= 200) and 
result.status)
-            if error_info then
+            if not svc_success or error_info then
                 log.error("connect consul: ", consul_server.consul_server_url,
                     ", by service url: ", svc_url, ", with error: ", 
error_info)
                 goto CONTINUE
             end
 
             -- decode body, decode json, update service, error handling
-            if result.body then
-                log.notice("service url: ", svc_url,
-                    ", header: ", json_delay_encode(result.headers, true),
-                    ", body: ", json_delay_encode(result.body, true))
+            -- check result body is not nil and not empty
+            if is_not_empty(result.body) then
                 -- add services to table
                 local nodes = up_services[service_name]
-                for  _, node in ipairs(result.body) do
-                    local svc_address, svc_port = node.ServiceAddress, 
node.ServicePort
-                    if not svc_address then
-                        svc_address = node.Address
+                for _, node in ipairs(result.body) do
+                    if not node.Service then
+                        goto CONTINUE
                     end
+
+                    local svc_address, svc_port = node.Service.Address, 
node.Service.Port
                     -- if nodes is nil, new nodes table and set to up_services
                     if not nodes then
                         nodes = core.table.new(1, 0)
@@ -270,8 +512,9 @@ function _M.connect(premature, consul_server, retry_delay)
         update_all_services(consul_server.consul_server_url, up_services)
 
         --update events
-        local ok, post_err = events.post(events_list._source, 
events_list.updating, all_services)
-        if not ok then
+        local post_ok, post_err = events.post(events_list._source,
+                events_list.updating, all_services)
+        if not post_ok then
             log.error("post_event failure with ", events_list._source,
                 ", update all services error: ", post_err)
         end
@@ -280,37 +523,17 @@ function _M.connect(premature, consul_server, retry_delay)
             ngx_timer_at(0, write_dump_services)
         end
 
-        consul_server.index = watch_result.headers['X-Consul-Index']
-        -- only long connect type use index
-        if consul_server.keepalive then
-            consul_server.default_args.index = 
watch_result.headers['X-Consul-Index']
-        end
+        update_index(consul_server,
+                catalog_res.headers['X-Consul-Index'],
+                health_res.headers['X-Consul-Index'])
     end
 
-    :: ERR ::
-    local keepalive = consul_server.keepalive
-    if keepalive then
-        local ok, err = ngx_timer_at(0, _M.connect, consul_server, retry_delay)
-        if not ok then
-            log.error("create ngx_timer_at got error: ", err)
-            return
-        end
-    end
+    check_keepalive(consul_server, retry_delay)
 end
 
 
 local function format_consul_params(consul_conf)
     local consul_server_list = core.table.new(0, #consul_conf.servers)
-    local args
-
-    if consul_conf.keepalive == false then
-        args = {}
-    elseif consul_conf.keepalive then
-        args = {
-            wait = consul_conf.timeout.wait, --blocked wait!=0; unblocked by 
wait=0
-            index = 0,
-        }
-    end
 
     for _, v in pairs(consul_conf.servers) do
         local scheme, host, port, path = unpack(http.parse_uri(nil, v))
@@ -319,23 +542,23 @@ local function format_consul_params(consul_conf)
         elseif path ~= "/" or core.string.has_suffix(v, '/') then
             return nil, "invalid consul server address, the valid format: 
http://address:port";
         end
-
         core.table.insert(consul_server_list, {
             host = host,
             port = port,
             connect_timeout = consul_conf.timeout.connect,
             read_timeout = consul_conf.timeout.read,
-            consul_sub_url = "/catalog/service",
-            consul_watch_sub_url = "/catalog/services",
+            wait_timeout = consul_conf.timeout.wait,
+            consul_watch_catalog_url = "/catalog/services",
+            consul_sub_url = "/health/service",
+            consul_watch_health_url = "/health/state/any",
             consul_server_url = v .. "/v1",
             weight = consul_conf.weight,
             keepalive = consul_conf.keepalive,
-            default_args = args,
-            index = 0,
+            health_index = 0,
+            catalog_index = 0,
             fetch_interval = consul_conf.fetch_interval -- fetch interval to 
next connect consul
         })
     end
-
     return consul_server_list, nil
 end
 
diff --git a/t/discovery/consul.t b/t/discovery/consul.t
index cf97e0ce8..57a6ab5b4 100644
--- a/t/discovery/consul.t
+++ b/t/discovery/consul.t
@@ -582,3 +582,78 @@ GET /thc
 
[{"ip":"127.0.0.1","port":30513,"status":"healthy"},{"ip":"127.0.0.1","port":30514,"status":"healthy"}]
 
[{"ip":"127.0.0.1","port":30513,"status":"healthy"},{"ip":"127.0.0.1","port":30514,"status":"healthy"}]
 --- ignore_error_log
+
+
+
+=== TEST 13: test consul catalog service change
+--- yaml_config
+apisix:
+  node_listen: 1984
+deployment:
+  role: data_plane
+  role_data_plane:
+    config_provider: yaml
+discovery:
+  consul:
+    servers:
+      - "http://127.0.0.1:8500";
+    keepalive: false
+    fetch_interval: 3
+    default_service:
+      host: "127.0.0.1"
+      port: 20999
+#END
+--- apisix_yaml
+routes:
+  -
+    uri: /*
+    upstream:
+      service_name: service_a
+      discovery_type: consul
+      type: roundrobin
+#END
+--- config
+location /v1/agent {
+    proxy_pass http://127.0.0.1:8500;
+}
+
+location /sleep {
+    content_by_lua_block {
+        local args = ngx.req.get_uri_args()
+        local sec = args.sec or "2"
+        ngx.sleep(tonumber(sec))
+        ngx.say("ok")
+    }
+}
+--- timeout: 6
+--- request eval
+[
+    "PUT /v1/agent/service/deregister/service_a1",
+    "GET /sleep?sec=3",
+    "GET /hello",
+    "PUT /v1/agent/service/register\n" . 
"{\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+    "GET /sleep?sec=5",
+    "GET /hello",
+    "PUT /v1/agent/service/deregister/service_a1",
+    "GET /sleep?sec=5",
+    "GET /hello",
+    "PUT /v1/agent/service/register\n" . 
"{\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+    "GET /sleep?sec=5",
+    "GET /hello",
+]
+--- response_body_like eval
+[
+    qr//,
+    qr/ok\n/,
+    qr/missing consul services\n/,
+    qr//,
+    qr/ok\n/,
+    qr/server 1\n/,
+    qr//,
+    qr/ok\n/,
+    qr/missing consul services\n/,
+    qr//,
+    qr/ok\n/,
+    qr/server 1\n/,
+]
+--- ignore_error_log
diff --git a/t/discovery/consul_dump.t b/t/discovery/consul_dump.t
index 366c76a98..c74e5b150 100644
--- a/t/discovery/consul_dump.t
+++ b/t/discovery/consul_dump.t
@@ -451,3 +451,55 @@ discovery:
 GET /bonjour
 --- response_body
 
{"service_a":[{"host":"127.0.0.1","port":30511,"weight":1}],"service_b":[{"host":"127.0.0.1","port":30517,"weight":1}]}
+
+
+
+=== TEST 16: prepare nodes with consul health check
+--- config
+location /v1/agent {
+    proxy_pass http://127.0.0.1:8500;
+}
+--- request eval
+[
+    "PUT /v1/agent/service/deregister/service_a1",
+    "PUT /v1/agent/service/deregister/service_a2",
+    "PUT /v1/agent/service/deregister/service_b1",
+    "PUT /v1/agent/service/deregister/service_b2",
+    "PUT /v1/agent/service/register\n" . "{\"Checks\": [{\"http\": 
\"http://baidu.com\",\"interval\": 
\"1s\"}],\"ID\":\"service_a1\",\"Name\":\"service_a\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":30511,\"Meta\":{\"service_a_version\":\"4.0\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+    "PUT /v1/agent/service/register\n" . "{\"Checks\": [{\"http\": 
\"http://127.0.0.1:8002\",\"interval\": 
\"1s\"}],\"ID\":\"service_b1\",\"Name\":\"service_b\",\"Tags\":[\"primary\",\"v1\"],\"Address\":\"127.0.0.1\",\"Port\":8002,\"Meta\":{\"service_b_version\":\"4.1\"},\"EnableTagOverride\":false,\"Weights\":{\"Passing\":10,\"Warning\":1}}",
+]
+--- response_body eval
+--- error_code eval
+[200, 200, 200, 200, 200, 200]
+
+
+
+=== TEST 17: show dump services with consul health check
+--- yaml_config
+apisix:
+  node_listen: 1984
+  enable_control: true
+discovery:
+  consul:
+    servers:
+      - "http://127.0.0.1:8500";
+    dump:
+      path: "consul.dump"
+      load_on_init: false
+--- config
+    location /t {
+        content_by_lua_block {
+            local json = require("toolkit.json")
+            local t = require("lib.test_admin")
+            ngx.sleep(2)
+            local code, body, res = 
t.test('/v1/discovery/consul/show_dump_file',
+                ngx.HTTP_GET)
+            local entity = json.decode(res)
+            ngx.say(json.encode(entity.services))
+        }
+    }
+--- timeout: 3
+--- request
+GET /t
+--- response_body
+{"service_a":[{"host":"127.0.0.1","port":30511,"weight":1}]}

Reply via email to