ElvinEfendi commented on a change in pull request #3300: URL: https://github.com/apache/apisix/pull/3300#discussion_r558321432
########## File path: apisix/balancer/ewma.lua ########## @@ -3,105 +3,125 @@ -- Inspiration drawn from: -- https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421 -- /finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala - local core = require("apisix.core") +local nkeys = require("core.table.nkeys") +local resty_lock = require("resty.lock") + local ngx = ngx local ngx_shared = ngx.shared local ngx_now = ngx.now local math = math local pairs = pairs local next = next +local error = error -local _M = {} local DECAY_TIME = 10 -- this value is in seconds +local LOCK_KEY = ":ewma_key" local shm_ewma = ngx_shared.balancer_ewma -local shm_last_touched_at= ngx_shared.balancer_ewma_last_touched_at +local shm_last_touched_at = ngx_shared.balancer_ewma_last_touched_at + +local lrucache_addr = core.lrucache.new({ttl = 300, count = 1024}) +local lrucache_trans_format = core.lrucache.new({ttl = 300, count = 256}) + +local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks", + {timeout = 0, exptime = 0.1}) + +local _M = {name = "ewma"} + +local function lock(upstream) + local _, err = ewma_lock:lock(upstream .. LOCK_KEY) + if err then + if err ~= "timeout" then + core.log.error("EWMA Balancer failed to lock: ", err) + end + end + + return err +end -local lrucache_addr = core.lrucache.new({ - ttl = 300, count = 1024 -}) -local lrucache_trans_format = core.lrucache.new({ - ttl = 300, count = 256 -}) +local function unlock() + local ok, err = ewma_lock:unlock() + if not ok then + core.log.error("EWMA Balancer failed to unlock: ", err) + end + return err +end local function decay_ewma(ewma, last_touched_at, rtt, now) local td = now - last_touched_at - td = math.max(td, 0) + td = (td > 0) and td or 0 local weight = math.exp(-td / DECAY_TIME) ewma = ewma * weight + rtt * (1.0 - weight) return ewma end - local function store_stats(upstream, ewma, now) local success, err, forcible = shm_last_touched_at:set(upstream, now) if not success then - core.log.error("balancer_ewma_last_touched_at:set failed ", err) + core.log.warn("shm_last_touched_at:set failed: ", err) end if forcible then - core.log.warn("balancer_ewma_last_touched_at:set valid items forcibly overwritten") + core.log + .warn("shm_last_touched_at:set valid items forcibly overwritten") end success, err, forcible = shm_ewma:set(upstream, ewma) if not success then - core.log.error("balancer_ewma:set failed ", err) + core.log.warn("shm_ewma:set failed: ", err) end if forcible then - core.log.warn("balancer_ewma:set valid items forcibly overwritten") + core.log.warn("shm_ewma:set valid items forcibly overwritten") end end - local function get_or_update_ewma(upstream, rtt, update) + local lock_err = nil + if update then Review comment: Not locking for `get` without update will result in potentially incorrect behaviour. Imagine at line 86 you fetch current `ewma` value, then another worker updates it and its `last_touched_at` value before you retrieve `last_touched_at` at line 92. Then you will end up treating the old ewma as a new one. I'm not sure if in practice it would make big difference, but it is definitely not a correct behaviour. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
