This is an automated email from the ASF dual-hosted git repository. nic443 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push: new dac0b5546 fix(etcd): upgrade revision when watch request timeout (#12514) dac0b5546 is described below commit dac0b5546082229e1d2e44eb3cbce181947ca6b7 Author: Nic <qiany...@api7.ai> AuthorDate: Mon Aug 18 15:39:26 2025 +0800 fix(etcd): upgrade revision when watch request timeout (#12514) Signed-off-by: Nic <qiany...@api7.ai> Co-authored-by: YuanSheng Wang <membp...@gmail.com> --- apisix/core/config_etcd.lua | 41 +++++++++++++++++++++++++++++++---- t/core/config_etcd.t | 52 +++++++++++++++++++++++++++++++++++++++++++++ t/xrpc/pingpong.t | 2 +- 3 files changed, 90 insertions(+), 5 deletions(-) diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua index 93e97a93b..995e047c0 100644 --- a/apisix/core/config_etcd.lua +++ b/apisix/core/config_etcd.lua @@ -190,6 +190,18 @@ local function do_run_watch(premature) opts.need_cancel = true opts.start_revision = watch_ctx.rev + -- get latest revision + local res, err = watch_ctx.cli:readdir(watch_ctx.prefix .. "/phantomkey") + if err then + log.error("failed to get latest revision, err: ", err) + end + local latest_rev + if res and res.body and res.body.header and res.body.header.revision then + latest_rev = tonumber(res.body.header.revision) + else + log.error("failed to get latest revision, res: ", json.delay_encode(res)) + end + log.info("restart watchdir: start_revision=", opts.start_revision) local res_func, err, http_cli = watch_ctx.cli:watchdir(watch_ctx.prefix, opts) @@ -213,6 +225,12 @@ local function do_run_watch(premature) then log.error("wait watch event: ", err) end + if err == "timeout" then + if latest_rev and watch_ctx.rev < latest_rev + 1 then + watch_ctx.rev = latest_rev + 1 + log.info("etcd watch timeout, upgrade revision to ", watch_ctx.rev) + end + end cancel_watch(http_cli) break end @@ -223,10 +241,21 @@ local function do_run_watch(premature) break end - if res.result.created then - goto watch_event - end - + --[[ + when etcd response permission denied, both result.canceled and result.created are true, + so we need to check result.canceled first, for example: + result = { + cancel_reason = "rpc error: code = PermissionDenied desc = etcdserver: permission denied", + canceled = true, + created = true, + header = { + cluster_id = "14841639068965178418", + member_id = "10276657743932975437", + raft_term = "4", + revision = "33" + } + } + --]] if res.result.canceled then log.warn("watch canceled by etcd, res: ", inspect(res)) if res.result.compact_revision then @@ -238,6 +267,10 @@ local function do_run_watch(premature) break end + if res.result.created then + goto watch_event + end + -- cleanup local min_idx = 0 for _, idx in pairs(watch_ctx.idx) do diff --git a/t/core/config_etcd.t b/t/core/config_etcd.t index 94251569f..918ae7f37 100644 --- a/t/core/config_etcd.t +++ b/t/core/config_etcd.t @@ -514,3 +514,55 @@ qr/main etcd watcher initialised, revision=/ --- grep_error_log_out main etcd watcher initialised, revision= main etcd watcher initialised, revision= + + + +=== TEST 14: watch revision should be upgraded when timeout occurs +--- yaml_config +deployment: + role: traditional + role_traditional: + config_provider: etcd + etcd: + host: + - "http://127.0.0.1:2379" + watch_timeout: 1 + prefix: /apisix +--- extra_yaml_config +nginx_config: + worker_processes: 1 +--- config + location /t { + content_by_lua_block { + local etcd = require("resty.etcd") + local etcd_cli, err = etcd.new({ + http_host = "http://127.0.0.1:2379", + }) + if not etcd_cli then + ngx.say("failed to create etcd client: ", err) + return + end + ngx.sleep(2) + -- we will assert 4 lines of revision upgrade log because we have one worker and one privileged agent + for i = 1, 2 do + local _, err = etcd_cli:set("/apache", "apisix") + if err then + ngx.say("failed to set key: ", err) + return + end + ngx.sleep(1) + end + ngx.say("passed") + } + } +--- request +GET /t +--- response_body +passed +--- grep_error_log eval +qr/etcd watch timeout, upgrade revision to/ +--- grep_error_log_out +etcd watch timeout, upgrade revision to +etcd watch timeout, upgrade revision to +etcd watch timeout, upgrade revision to +etcd watch timeout, upgrade revision to diff --git a/t/xrpc/pingpong.t b/t/xrpc/pingpong.t index 65b5346f8..a84358adb 100644 --- a/t/xrpc/pingpong.t +++ b/t/xrpc/pingpong.t @@ -154,7 +154,7 @@ pp\x01\x00\x00\x00\x00\x00\x00\x00" "pp\x02\x00\x00\x00\x00\x00\x00\x03ABC" x 3 --- log_level: debug --- no_error_log -stream lua tcp socket set keepalive +stream lua tcp socket keepalive create connection pool for key "127.0.0.1:1995" --- stream_conf_enable