monkeyDluffy6017 commented on code in PR #9456:
URL: https://github.com/apache/apisix/pull/9456#discussion_r1199005319


##########
apisix/core/config_etcd.lua:
##########
@@ -75,6 +82,203 @@ local mt = {
 }
 
 
+local get_etcd
+do
+    local etcd_cli
+
+    function get_etcd()
+        if etcd_cli ~= nil then
+            return etcd_cli
+        end
+
+        local _, err
+        etcd_cli, _, err = etcd_apisix.get_etcd_syncer()
+        return etcd_cli, err
+    end
+end
+
+
+local function cancel_watch(http_cli)
+    local res, err = watch_ctx.cli:watchcancel(http_cli)
+    if res == 1 then
+        log.info("cancel watch connection success")
+    else
+        log.error("cancel watch failed: ", err)
+    end
+end
+
+
+-- append res to the queue and notify pending watchers
+local function produce_res(res, err)
+    log.info("append res: ", inspect(res), ", err: ", inspect(err))
+    insert_tab(watch_ctx.res, {res=res, err=err})
+    for _, sema in pairs(watch_ctx.sema) do
+        sema:post()
+    end
+    table.clear(watch_ctx.sema)
+end
+
+
+local function run_watch(premature)
+    if premature then
+        return
+    end
+
+    local local_conf, err = config_local.local_conf()
+    if not local_conf then
+        error("no local conf: " .. err)
+    end
+    watch_ctx.prefix = local_conf.etcd.prefix .. "/"
+
+    watch_ctx.cli, err = get_etcd()
+    if not watch_ctx.cli then
+        error("failed to create etcd instance: " .. string(err))
+    end
+
+    local rev = 0
+    if loaded_configuration then
+        local _, res = next(loaded_configuration)
+        if res then
+            rev = tonumber(res.headers["X-Etcd-Index"])
+            assert(rev > 0, 'invalid res.headers["X-Etcd-Index"]')
+        end
+    end
+
+    if rev == 0 then
+        while true do
+            local res, err = watch_ctx.cli:get(watch_ctx.prefix)
+            if not res then
+                log.error("etcd get: ", err)
+                ngx_sleep(3)
+            else
+                watch_ctx.rev = tonumber(res.body.header.revision)
+                break
+            end
+        end
+    end
+
+    watch_ctx.rev = rev + 1
+    watch_ctx.started = true
+
+    log.warn("main etcd watcher started, revision=", watch_ctx.rev)
+    for _, sema in pairs(watch_ctx.wait_init) do
+        sema:post()
+    end
+    watch_ctx.wait_init = nil
+
+    local opts = {}
+    opts.timeout = 50 -- second
+    opts.need_cancel = true
+
+    ::restart_watch::
+    while true do
+        opts.start_revision = watch_ctx.rev
+        log.info("restart watchdir: start_revision=", opts.start_revision)
+        local res_func, err, http_cli = 
watch_ctx.cli:watchdir(watch_ctx.prefix, opts)
+        if not res_func then
+            log.error("watchdir: ", err)
+            ngx_sleep(3)
+            goto restart_watch
+        end
+
+        ::watch_event::
+        while true do
+            local res, err = res_func()
+            log.info("res_func: ", inspect(res))
+
+            if not res then
+                if err ~= "closed" and
+                    err ~= "timeout" and
+                    err ~= "broken pipe" then
+                    log.error("wait watch event: ", err)
+                end
+                cancel_watch(http_cli)
+                break
+            end
+
+            if res.error then
+                log.error("wait watch event: ", inspect(res.error))
+                cancel_watch(http_cli)
+                break
+            end
+
+            if res.result.created then
+                goto watch_event
+            end
+
+            if res.result.canceled then
+                log.warn("watch canceled by etcd, res: ", inspect(res))
+                if res.result.compact_revision then
+                    watch_ctx.rev = tonumber(res.result.compact_revision)
+                    log.warn("etcd compacted, compact_revision=", 
watch_ctx.rev)
+                    produce_res(nil, "compacted")
+                end
+                cancel_watch(http_cli)
+                break
+            end
+
+            -- cleanup
+            local min_idx = 0
+            for _, idx in pairs(watch_ctx.idx) do
+                if (min_idx == 0) or (idx < min_idx) then
+                    min_idx = idx
+                end
+            end
+
+            for i = 1,min_idx-1 do
+                watch_ctx.res[i] = false
+            end
+
+            if min_idx > 100 then
+                for k, idx in pairs(watch_ctx.idx) do
+                    watch_ctx.idx[k] = idx-min_idx+1

Review Comment:
   ```suggestion
                       watch_ctx.idx[k] = idx - min_idx + 1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to