monkeyDluffy6017 commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1200230058
##########
apisix/core/config_etcd.lua:
##########
@@ -75,6 +85,208 @@ local mt = {
}
+local get_etcd
+do
+ local etcd_cli
+
+ function get_etcd()
+ if etcd_cli ~= nil then
+ return etcd_cli
+ end
+
+ local _, err
+ etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
+ return etcd_cli, err
+ end
+end
+
+
+local function cancel_watch(http_cli)
+ local res, err = watch_ctx.cli:watchcancel(http_cli)
+ if res == 1 then
+ log.info("cancel watch connection success")
+ else
+ log.error("cancel watch failed: ", err)
+ end
+end
+
+
+-- append res to the queue and notify pending watchers
+local function produce_res(res, err)
+ if log_level >= NGX_INFO then
+ log.info("append res: ", inspect(res), ", err: ", inspect(err))
+ end
+ insert_tab(watch_ctx.res, {res=res, err=err})
+ for _, sema in pairs(watch_ctx.sema) do
+ sema:post()
+ end
+ table.clear(watch_ctx.sema)
+end
+
+
+local function run_watch(premature)
+ if premature then
+ return
+ end
+
+ local local_conf, err = config_local.local_conf()
+ if not local_conf then
+ error("no local conf: " .. err)
+ end
+ watch_ctx.prefix = local_conf.etcd.prefix .. "/"
+
+ watch_ctx.cli, err = get_etcd()
+ if not watch_ctx.cli then
+ error("failed to create etcd instance: " .. string(err))
+ end
+
+ local rev = 0
+ if loaded_configuration then
+ local _, res = next(loaded_configuration)
+ if res then
+ rev = tonumber(res.headers["X-Etcd-Index"])
+ assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]')
+ end
+ end
+
+ if rev == 0 then
+ while true do
+ local res, err = watch_ctx.cli:get(watch_ctx.prefix)
+ if not res then
+ log.error("etcd get: ", err)
+ ngx_sleep(3)
+ else
+ watch_ctx.rev = tonumber(res.body.header.revision)
+ break
+ end
+ end
+ end
+
+ watch_ctx.rev = rev + 1
+ watch_ctx.started = true
+
+ log.warn("main etcd watcher started, revision=", watch_ctx.rev)
+ for _, sema in pairs(watch_ctx.wait_init) do
+ sema:post()
+ end
+ watch_ctx.wait_init = nil
+
+ local opts = {}
+ opts.timeout = 50 -- second
+ opts.need_cancel = true
+
+ ::restart_watch::
+ while true do
+ opts.start_revision = watch_ctx.rev
+ log.info("restart watchdir: start_revision=", opts.start_revision)
+ local res_func, err, http_cli =
watch_ctx.cli:watchdir(watch_ctx.prefix, opts)
+ if not res_func then
+ log.error("watchdir: ", err)
+ ngx_sleep(3)
+ goto restart_watch
+ end
+
+ ::watch_event::
+ while true do
+ local res, err = res_func()
Review Comment:
It's very good design here, the server response is a stream, if we don't
close the connection here, we could get the response event one by one
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]