This is an automated email from the ASF dual-hosted git repository.

nic443 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new dac0b5546 fix(etcd): upgrade revision when watch request timeout 
(#12514)
dac0b5546 is described below

commit dac0b5546082229e1d2e44eb3cbce181947ca6b7
Author: Nic <qiany...@api7.ai>
AuthorDate: Mon Aug 18 15:39:26 2025 +0800

    fix(etcd): upgrade revision when watch request timeout (#12514)
    
    Signed-off-by: Nic <qiany...@api7.ai>
    Co-authored-by: YuanSheng Wang <membp...@gmail.com>
---
 apisix/core/config_etcd.lua | 41 +++++++++++++++++++++++++++++++----
 t/core/config_etcd.t        | 52 +++++++++++++++++++++++++++++++++++++++++++++
 t/xrpc/pingpong.t           |  2 +-
 3 files changed, 90 insertions(+), 5 deletions(-)

diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua
index 93e97a93b..995e047c0 100644
--- a/apisix/core/config_etcd.lua
+++ b/apisix/core/config_etcd.lua
@@ -190,6 +190,18 @@ local function do_run_watch(premature)
     opts.need_cancel = true
     opts.start_revision = watch_ctx.rev
 
+    -- get latest revision
+    local res, err = watch_ctx.cli:readdir(watch_ctx.prefix .. "/phantomkey")
+    if err then
+        log.error("failed to get latest revision, err: ", err)
+    end
+    local latest_rev
+    if res and res.body and res.body.header and res.body.header.revision then
+        latest_rev = tonumber(res.body.header.revision)
+    else
+        log.error("failed to get latest revision, res: ", 
json.delay_encode(res))
+    end
+
     log.info("restart watchdir: start_revision=", opts.start_revision)
 
     local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, 
opts)
@@ -213,6 +225,12 @@ local function do_run_watch(premature)
             then
                 log.error("wait watch event: ", err)
             end
+            if err == "timeout" then
+                if latest_rev and watch_ctx.rev < latest_rev + 1 then
+                    watch_ctx.rev = latest_rev + 1
+                    log.info("etcd watch timeout, upgrade revision to ", 
watch_ctx.rev)
+                end
+            end
             cancel_watch(http_cli)
             break
         end
@@ -223,10 +241,21 @@ local function do_run_watch(premature)
             break
         end
 
-        if res.result.created then
-            goto watch_event
-        end
-
+        --[[
+        when etcd response permission denied, both result.canceled and 
result.created are true,
+        so we need to check result.canceled first, for example:
+        result = {
+          cancel_reason = "rpc error: code = PermissionDenied desc = 
etcdserver: permission denied",
+          canceled = true,
+          created = true,
+          header = {
+            cluster_id = "14841639068965178418",
+            member_id = "10276657743932975437",
+            raft_term = "4",
+            revision = "33"
+          }
+        }
+        --]]
         if res.result.canceled then
             log.warn("watch canceled by etcd, res: ", inspect(res))
             if res.result.compact_revision then
@@ -238,6 +267,10 @@ local function do_run_watch(premature)
             break
         end
 
+        if res.result.created then
+            goto watch_event
+        end
+
         -- cleanup
         local min_idx = 0
         for _, idx in pairs(watch_ctx.idx) do
diff --git a/t/core/config_etcd.t b/t/core/config_etcd.t
index 94251569f..918ae7f37 100644
--- a/t/core/config_etcd.t
+++ b/t/core/config_etcd.t
@@ -514,3 +514,55 @@ qr/main etcd watcher initialised, revision=/
 --- grep_error_log_out
 main etcd watcher initialised, revision=
 main etcd watcher initialised, revision=
+
+
+
+=== TEST 14: watch revision should be upgraded when timeout occurs
+--- yaml_config
+deployment:
+  role: traditional
+  role_traditional:
+    config_provider: etcd
+  etcd:
+    host:
+      - "http://127.0.0.1:2379";
+    watch_timeout: 1
+    prefix: /apisix
+--- extra_yaml_config
+nginx_config:
+    worker_processes: 1
+--- config
+    location /t {
+        content_by_lua_block {
+            local etcd = require("resty.etcd")
+            local etcd_cli, err = etcd.new({
+                http_host = "http://127.0.0.1:2379";,
+            })
+            if not etcd_cli then
+                ngx.say("failed to create etcd client: ", err)
+                return
+            end
+            ngx.sleep(2)
+            -- we will assert 4 lines of revision upgrade log because we have 
one worker and one privileged agent
+            for i = 1, 2 do
+               local _, err = etcd_cli:set("/apache", "apisix")
+               if err then
+                   ngx.say("failed to set key: ", err)
+                   return
+               end
+               ngx.sleep(1)
+            end
+            ngx.say("passed")
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- grep_error_log eval
+qr/etcd watch timeout, upgrade revision to/
+--- grep_error_log_out
+etcd watch timeout, upgrade revision to
+etcd watch timeout, upgrade revision to
+etcd watch timeout, upgrade revision to
+etcd watch timeout, upgrade revision to
diff --git a/t/xrpc/pingpong.t b/t/xrpc/pingpong.t
index 65b5346f8..a84358adb 100644
--- a/t/xrpc/pingpong.t
+++ b/t/xrpc/pingpong.t
@@ -154,7 +154,7 @@ pp\x01\x00\x00\x00\x00\x00\x00\x00"
 "pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" x 3
 --- log_level: debug
 --- no_error_log
-stream lua tcp socket set keepalive
+stream lua tcp socket keepalive create connection pool for key "127.0.0.1:1995"
 --- stream_conf_enable
 
 

Reply via email to