tokers commented on a change in pull request #3300:
URL: https://github.com/apache/apisix/pull/3300#discussion_r559077512



##########
File path: apisix/balancer/ewma.lua
##########
@@ -3,105 +3,125 @@
 -- Inspiration drawn from:
 -- 
https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421
 --   
/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
-
 local core = require("apisix.core")
+local nkeys = require("core.table.nkeys")
+local resty_lock = require("resty.lock")
+
 local ngx = ngx
 local ngx_shared = ngx.shared
 local ngx_now = ngx.now
 local math = math
 local pairs = pairs
 local next = next
+local error = error
 
-local _M = {}
 local DECAY_TIME = 10 -- this value is in seconds
+local LOCK_KEY = ":ewma_key"
 
 local shm_ewma = ngx_shared.balancer_ewma
-local shm_last_touched_at= ngx_shared.balancer_ewma_last_touched_at
+local shm_last_touched_at = ngx_shared.balancer_ewma_last_touched_at
+
+local lrucache_addr = core.lrucache.new({ttl = 300, count = 1024})
+local lrucache_trans_format = core.lrucache.new({ttl = 300, count = 256})
+
+local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks",
+                                                {timeout = 0, exptime = 0.1})
+
+local _M = {name = "ewma"}
+
+local function lock(upstream)
+    local _, err = ewma_lock:lock(upstream .. LOCK_KEY)
+    if err then
+        if err ~= "timeout" then
+            core.log.error("EWMA Balancer failed to lock: ", err)
+        end
+    end
+
+    return err
+end
 
-local lrucache_addr = core.lrucache.new({
-    ttl = 300, count = 1024
-})
-local lrucache_trans_format = core.lrucache.new({
-    ttl = 300, count = 256
-})
+local function unlock()
+    local ok, err = ewma_lock:unlock()
+    if not ok then
+        core.log.error("EWMA Balancer failed to unlock: ", err)
+    end
 
+    return err
+end
 
 local function decay_ewma(ewma, last_touched_at, rtt, now)
     local td = now - last_touched_at
-    td = math.max(td, 0)
+    td = (td > 0) and td or 0
     local weight = math.exp(-td / DECAY_TIME)
 
     ewma = ewma * weight + rtt * (1.0 - weight)
     return ewma
 end
 
-
 local function store_stats(upstream, ewma, now)
     local success, err, forcible = shm_last_touched_at:set(upstream, now)
     if not success then
-        core.log.error("balancer_ewma_last_touched_at:set failed ", err)
+        core.log.warn("shm_last_touched_at:set failed: ", err)
     end
     if forcible then
-        core.log.warn("balancer_ewma_last_touched_at:set valid items forcibly 
overwritten")
+        core.log

Review comment:
       Why breaking line here?

##########
File path: apisix/balancer/ewma.lua
##########
@@ -3,105 +3,125 @@
 -- Inspiration drawn from:
 -- 
https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421
 --   
/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
-
 local core = require("apisix.core")
+local nkeys = require("core.table.nkeys")
+local resty_lock = require("resty.lock")
+
 local ngx = ngx
 local ngx_shared = ngx.shared
 local ngx_now = ngx.now
 local math = math
 local pairs = pairs
 local next = next
+local error = error
 
-local _M = {}
 local DECAY_TIME = 10 -- this value is in seconds
+local LOCK_KEY = ":ewma_key"
 
 local shm_ewma = ngx_shared.balancer_ewma
-local shm_last_touched_at= ngx_shared.balancer_ewma_last_touched_at
+local shm_last_touched_at = ngx_shared.balancer_ewma_last_touched_at
+
+local lrucache_addr = core.lrucache.new({ttl = 300, count = 1024})
+local lrucache_trans_format = core.lrucache.new({ttl = 300, count = 256})
+
+local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks",
+                                                {timeout = 0, exptime = 0.1})
+
+local _M = {name = "ewma"}
+
+local function lock(upstream)
+    local _, err = ewma_lock:lock(upstream .. LOCK_KEY)
+    if err then
+        if err ~= "timeout" then
+            core.log.error("EWMA Balancer failed to lock: ", err)
+        end
+    end
+
+    return err
+end
 
-local lrucache_addr = core.lrucache.new({
-    ttl = 300, count = 1024
-})
-local lrucache_trans_format = core.lrucache.new({
-    ttl = 300, count = 256
-})
+local function unlock()
+    local ok, err = ewma_lock:unlock()
+    if not ok then
+        core.log.error("EWMA Balancer failed to unlock: ", err)
+    end
 
+    return err
+end
 
 local function decay_ewma(ewma, last_touched_at, rtt, now)
     local td = now - last_touched_at
-    td = math.max(td, 0)
+    td = (td > 0) and td or 0
     local weight = math.exp(-td / DECAY_TIME)
 
     ewma = ewma * weight + rtt * (1.0 - weight)
     return ewma
 end
 
-
 local function store_stats(upstream, ewma, now)
     local success, err, forcible = shm_last_touched_at:set(upstream, now)
     if not success then
-        core.log.error("balancer_ewma_last_touched_at:set failed ", err)
+        core.log.warn("shm_last_touched_at:set failed: ", err)

Review comment:
       I think the `error` level is reasonable.

##########
File path: apisix/balancer/ewma.lua
##########
@@ -3,105 +3,125 @@
 -- Inspiration drawn from:
 -- 
https://github.com/twitter/finagle/blob/1bc837c4feafc0096e43c0e98516a8e1c50c4421
 --   
/finagle-core/src/main/scala/com/twitter/finagle/loadbalancer/PeakEwma.scala
-
 local core = require("apisix.core")
+local nkeys = require("core.table.nkeys")
+local resty_lock = require("resty.lock")
+
 local ngx = ngx
 local ngx_shared = ngx.shared
 local ngx_now = ngx.now
 local math = math
 local pairs = pairs
 local next = next
+local error = error
 
-local _M = {}
 local DECAY_TIME = 10 -- this value is in seconds
+local LOCK_KEY = ":ewma_key"
 
 local shm_ewma = ngx_shared.balancer_ewma
-local shm_last_touched_at= ngx_shared.balancer_ewma_last_touched_at
+local shm_last_touched_at = ngx_shared.balancer_ewma_last_touched_at
+
+local lrucache_addr = core.lrucache.new({ttl = 300, count = 1024})
+local lrucache_trans_format = core.lrucache.new({ttl = 300, count = 256})
+
+local ewma_lock, ewma_lock_err = resty_lock:new("balancer_ewma_locks",
+                                                {timeout = 0, exptime = 0.1})
+
+local _M = {name = "ewma"}
+
+local function lock(upstream)
+    local _, err = ewma_lock:lock(upstream .. LOCK_KEY)
+    if err then

Review comment:
       We can merge these two if conditions to one:
   
   ```lua
   if err and err ~= "timeout" then
   end
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to