This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new bbff579e9 fix: fix fetch all service info from consul (#8651)
bbff579e9 is described below

commit bbff579e9af19e569fc229f3b61664aa0a1d1c93
Author: fabriceli <[email protected]>
AuthorDate: Wed Feb 8 09:55:20 2023 +0800

    fix: fix fetch all service info from consul (#8651)
    
    Co-authored-by: Fabriceli <[email protected]>
---
 apisix/discovery/consul/init.lua   | 159 +++++++++++++++++++------------------
 conf/config-default.yaml           |   7 +-
 docs/en/latest/discovery/consul.md |   4 +-
 t/discovery/consul_dump.t          |   2 +-
 4 files changed, 89 insertions(+), 83 deletions(-)

diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua
index dd8275e7d..686ab4120 100644
--- a/apisix/discovery/consul/init.lua
+++ b/apisix/discovery/consul/init.lua
@@ -43,16 +43,18 @@ local events
 local events_list
 local consul_services
 
+local default_skip_services = {"consul"}
+
 local _M = {
-    version = 0.1,
+    version = 0.2,
 }
 
 
 local function discovery_consul_callback(data, event, source, pid)
     all_services = data
     log.notice("update local variable all_services, event is: ", event,
-            "source: ", source, "server pid:", pid,
-            ", all services: ", json_delay_encode(all_services, true))
+        "source: ", source, "server pid:", pid,
+        ", all services: ", json_delay_encode(all_services, true))
 end
 
 
@@ -75,44 +77,15 @@ function _M.nodes(service_name)
     end
 
     log.info("process id: ", ngx_worker_id(), ", all_services[", service_name, 
"] = ",
-            json_delay_encode(resp_list, true))
+        json_delay_encode(resp_list, true))
 
     return resp_list
 end
 
 
-local function parse_instance(node)
-    local service_name, host, port = node.Service, node.Address, node.Port
-    -- if exist, skip special service name
-    if service_name and skip_service_map[service_name] then
-        return false
-    end
-    -- "" means metadata of the service
-    return true, host, tonumber(port), "", service_name
-end
-
-
-local function update_all_services(server_name_prefix, data)
-    local up_services = core.table.new(0, #data)
-    local weight = default_weight
-    for _, node in pairs(data) do
-        local succ, ip, port, metadata, server_name = parse_instance(node)
-        if succ then
-            local nodes = up_services[server_name]
-            if not nodes then
-                nodes = core.table.new(1, 0)
-                up_services[server_name] = nodes
-            end
-            core.table.insert(nodes, {
-                host = ip,
-                port = port,
-                weight = metadata and metadata.weight or weight,
-            })
-        end
-    end
-
+local function update_all_services(consul_server_url, up_services)
     -- clean old unused data
-    local old_services = consul_services[server_name_prefix] or {}
+    local old_services = consul_services[consul_server_url] or {}
     for k, _ in pairs(old_services) do
         all_services[k] = nil
     end
@@ -121,7 +94,7 @@ local function update_all_services(server_name_prefix, data)
     for k, v in pairs(up_services) do
         all_services[k] = v
     end
-    consul_services[server_name_prefix] = up_services
+    consul_services[consul_server_url] = up_services
 
     log.info("update all services: ", json_delay_encode(all_services, true))
 end
@@ -154,7 +127,7 @@ local function read_dump_services()
 
     local now_time = ngx.time()
     log.info("dump file last_update: ", entity.last_update, ", 
dump_params.expire: ",
-            dump_params.expire, ", now_time: ", now_time)
+        dump_params.expire, ", now_time: ", now_time)
     if dump_params.expire ~= 0  and (entity.last_update + dump_params.expire) 
< now_time then
         log.warn("dump file: ", dump_params.path, " had expired, ignored it")
         return
@@ -223,9 +196,9 @@ function _M.connect(premature, consul_server, retry_delay)
             and watch_result.status)
     if watch_error_info then
         log.error("connect consul: ", consul_server.consul_server_url,
-                " by sub url: ", consul_server.consul_watch_sub_url,
-                ", got watch result: ", json_delay_encode(watch_result, true),
-                 ", with error: ", watch_error_info)
+            " by sub url: ", consul_server.consul_watch_sub_url,
+            ", got watch result: ", json_delay_encode(watch_result, true),
+            ", with error: ", watch_error_info)
 
         retry_delay = get_retry_delay(retry_delay)
         log.warn("retry connecting consul after ", retry_delay, " seconds")
@@ -235,31 +208,76 @@ function _M.connect(premature, consul_server, retry_delay)
     end
 
     log.info("connect consul: ", consul_server.consul_server_url,
-            ", watch_result status: ", watch_result.status,
-            ", watch_result.headers.index: ", 
watch_result.headers['X-Consul-Index'],
-            ", consul_server.index: ", consul_server.index,
-            ", consul_server: ", json_delay_encode(consul_server, true))
+        ", watch_result status: ", watch_result.status,
+        ", watch_result.headers.index: ", 
watch_result.headers['X-Consul-Index'],
+        ", consul_server.index: ", consul_server.index,
+        ", consul_server: ", json_delay_encode(consul_server, true))
 
     -- if current index different last index then update service
     if consul_server.index ~= watch_result.headers['X-Consul-Index'] then
+        local up_services = core.table.new(0, #watch_result.body)
+        local consul_client_svc = resty_consul:new({
+            host = consul_server.host,
+            port = consul_server.port,
+            connect_timeout = consul_server.connect_timeout,
+            read_timeout = consul_server.read_timeout,
+        })
+        for service_name, _ in pairs(watch_result.body) do
+            -- check if the service_name is 'skip service'
+            if skip_service_map[service_name] then
+                goto CONTINUE
+            end
+            -- get node from service
+            local svc_url = consul_server.consul_sub_url .. "/" .. service_name
+            local result, err = consul_client_svc:get(svc_url)
+            local error_info = (err ~= nil and err) or
+                    ((result ~= nil and result.status ~= 200) and 
result.status)
+            if error_info then
+                log.error("connect consul: ", consul_server.consul_server_url,
+                    ", by service url: ", svc_url, ", with error: ", 
error_info)
+                goto CONTINUE
+            end
 
-        -- fetch all services info
-        local result, err = consul_client:get(consul_server.consul_sub_url)
-
-        local error_info = (err ~= nil and err) or
-         ((result ~= nil and result.status ~= 200) and result.status)
+            -- decode body, decode json, update service, error handling
+            if result.body then
+                log.notice("service url: ", svc_url,
+                    ", header: ", json_delay_encode(result.headers, true),
+                    ", body: ", json_delay_encode(result.body, true))
+                -- add services to table
+                local nodes = up_services[service_name]
+                for  _, node in ipairs(result.body) do
+                    local svc_address, svc_port = node.ServiceAddress, 
node.ServicePort
+                    if not svc_address then
+                        svc_address = node.Address
+                    end
+                    -- if nodes is nil, new nodes table and set to up_services
+                    if not nodes then
+                        nodes = core.table.new(1, 0)
+                        up_services[service_name] = nodes
+                    end
+                    -- add node to nodes table
+                    core.table.insert(nodes, {
+                        host = svc_address,
+                        port = tonumber(svc_port),
+                        weight = default_weight,
+                    })
+                end
+                up_services[service_name] = nodes
+            end
+            :: CONTINUE ::
+        end
 
-        if error_info then
-            log.error("connect consul: ", consul_server.consul_server_url,
-                    " by sub url: ", consul_server.consul_sub_url,
-                    ", got result: ", json_delay_encode(result, true),
-                    ", with error: ", error_info)
+        update_all_services(consul_server.consul_server_url, up_services)
 
-            retry_delay = get_retry_delay(retry_delay)
-            log.warn("retry connecting consul after ", retry_delay, " seconds")
-            core_sleep(retry_delay)
+        --update events
+        local ok, post_err = events.post(events_list._source, 
events_list.updating, all_services)
+        if not ok then
+            log.error("post_event failure with ", events_list._source,
+                ", update all services error: ", post_err)
+        end
 
-            goto ERR
+        if dump_params then
+            ngx_timer_at(0, write_dump_services)
         end
 
         consul_server.index = watch_result.headers['X-Consul-Index']
@@ -267,23 +285,6 @@ function _M.connect(premature, consul_server, retry_delay)
         if consul_server.keepalive then
             consul_server.default_args.index = 
watch_result.headers['X-Consul-Index']
         end
-        -- decode body, decode json, update service, error handling
-        if result.body then
-            log.notice("server_name: ", consul_server.consul_server_url,
-                    ", header: ", json_delay_encode(result.headers, true),
-                    ", body: ", json_delay_encode(result.body, true))
-            update_all_services(consul_server.consul_server_url, result.body)
-            --update events
-            local ok, err = events.post(events_list._source, 
events_list.updating, all_services)
-            if not ok then
-                log.error("post_event failure with ", events_list._source,
-                        ", update all services error: ", err)
-            end
-
-            if dump_params then
-                ngx_timer_at(0, write_dump_services)
-            end
-        end
     end
 
     :: ERR ::
@@ -324,7 +325,7 @@ local function format_consul_params(consul_conf)
             port = port,
             connect_timeout = consul_conf.timeout.connect,
             read_timeout = consul_conf.timeout.read,
-            consul_sub_url = "/agent/services",
+            consul_sub_url = "/catalog/service",
             consul_watch_sub_url = "/catalog/services",
             consul_server_url = v .. "/v1",
             weight = consul_conf.weight,
@@ -375,10 +376,14 @@ function _M.init_worker()
             skip_service_map[v] = true
         end
     end
+    -- set up default skip service
+    for _, v in ipairs(default_skip_services) do
+        skip_service_map[v] = true
+    end
 
     local consul_servers_list, err = format_consul_params(consul_conf)
     if err then
-        error(err)
+        error("format consul config got error: " .. err)
     end
     log.info("consul_server_list: ", json_delay_encode(consul_servers_list, 
true))
 
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 43bbad005..3fcd7505b 100755
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -305,11 +305,11 @@ nginx_config:                     # config for render the 
template to generate n
 #       path: "logs/consul_kv.dump"
 #       expire: 2592000            # unit sec, here is 30 day
 #  consul:
-#    servers:
-#      - "http://127.0.0.1:8500";
+#    servers:                          # make sure service name is unique in 
these consul servers
+#      - "http://127.0.0.1:8500";       # `http://127.0.0.1:8500` and 
`http://127.0.0.1:8600` are different clusters
 #      - "http://127.0.0.1:8600";
 #    skip_services:                    # if you need to skip special services
-#      - "service_a"
+#      - "service_a"                   # `consul` service is default skip 
service
 #    timeout:
 #      connect: 2000               # default 2000 ms
 #      read: 2000                  # default 2000 ms
@@ -327,6 +327,7 @@ nginx_config:                     # config for render the 
template to generate n
 #    dump:                         # if you need, when registered nodes 
updated can dump into file
 #       path: "logs/consul.dump"
 #       expire: 2592000            # unit sec, here is 30 day
+#       load_on_init: true         # default true, load the consul dump file 
on init
 #  kubernetes:
 #    ### kubernetes service discovery both support single-cluster and 
multi-cluster mode
 #    ### applicable to the case where the service is distributed in a single 
or multiple kubernetes clusters.
diff --git a/docs/en/latest/discovery/consul.md 
b/docs/en/latest/discovery/consul.md
index 2db7af144..e31a83690 100644
--- a/docs/en/latest/discovery/consul.md
+++ b/docs/en/latest/discovery/consul.md
@@ -35,8 +35,8 @@ First of all, we need to add following configuration in 
`conf/config.yaml` :
 discovery:
   consul:
     servers:                      # make sure service name is unique in these 
consul servers
-      - "http://127.0.0.1:8500";
-      - "http://127.0.0.1:8600";   # `http://127.0.0.1:8500` and 
`http://127.0.0.1:8600` are different clusters
+      - "http://127.0.0.1:8500";   # `http://127.0.0.1:8500` and 
`http://127.0.0.1:8600` are different clusters
+      - "http://127.0.0.1:8600";   # `consul` service is default skip service
     skip_services:                # if you need to skip special services
       - "service_a"
     timeout:
diff --git a/t/discovery/consul_dump.t b/t/discovery/consul_dump.t
index b229fb7a4..366c76a98 100644
--- a/t/discovery/consul_dump.t
+++ b/t/discovery/consul_dump.t
@@ -77,7 +77,7 @@ discovery:
       - "http://127.0.0.1:8500";
     dump:
       path: "consul.dump"
-      load_on_init: true
+      load_on_init: false
 --- config
     location /t {
         content_by_lua_block {

Reply via email to