This is an automated email from the ASF dual-hosted git repository.

tokers pushed a commit to branch fix/batch-processor
in repository https://gitbox.apache.org/repos/asf/apisix.git

commit add5906f87c9131213ef6221ceac1f9595ac3bea
Author: Alex Zhang <[email protected]>
AuthorDate: Mon Nov 30 20:06:20 2020 +0800

    fix: fixed the non effective config update in http-logger
---
 apisix/plugins/http-logger.lua |  46 +++++++++++--
 apisix/plugins/tcp-logger.lua  |   2 +-
 t/plugin/http-logger.t         | 146 +++++++++++++++++++++++++++++++++++++++++
 3 files changed, 187 insertions(+), 7 deletions(-)

diff --git a/apisix/plugins/http-logger.lua b/apisix/plugins/http-logger.lua
index 7a79102..1b2de40 100644
--- a/apisix/plugins/http-logger.lua
+++ b/apisix/plugins/http-logger.lua
@@ -21,19 +21,22 @@ local core            = require("apisix.core")
 local http            = require("resty.http")
 local url             = require("net.url")
 local plugin          = require("apisix.plugin")
+
 local ngx      = ngx
 local tostring = tostring
 local pairs    = pairs
-local ipairs = ipairs
+local ipairs   = ipairs
 local str_byte = string.byte
-
+local timer_at = ngx.timer.at
 
 local plugin_name = "http-logger"
+local stale_timer_running = false
 local buffers = {}
 local lru_log_format = core.lrucache.new({
     ttl = 300, count = 512
 })
 
+local plugin_conf
 
 local schema = {
     type = "object",
@@ -92,6 +95,8 @@ local function send_http_data(conf, log_message)
     local host = url_decoded.host
     local port = url_decoded.port
 
+    core.log.info("sending a batch logs to ", conf.uri)
+
     if ((not port) and url_decoded.scheme == "https") then
         port = 443
     elseif not port then
@@ -169,6 +174,23 @@ local function gen_log_format(metadata)
 end
 
 
+-- remove stale objects from the memory after timer expires
+local function remove_stale_objects(premature)
+    if premature then
+        return
+    end
+
+    for key, batch in ipairs(buffers) do
+        if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 
then
+            core.log.debug("removing batch processor stale object, route id:", 
tostring(key))
+            buffers[key] = nil
+        end
+    end
+
+    stale_timer_running = false
+end
+
+
 function _M.log(conf, ctx)
     local metadata = plugin.plugin_metadata(plugin_name)
     core.log.info("metadata: ", core.json.delay_encode(metadata))
@@ -199,6 +221,17 @@ function _M.log(conf, ctx)
         entry.route_id = "no-matched"
     end
 
+    if not stale_timer_running then
+        -- run the timer every 30 mins if any log is present
+        timer_at(1800, remove_stale_objects)
+        stale_timer_running = true
+    end
+
+    -- always cache the latest plugin conf in the module to avoid the
+    -- closure (method `func`) references to the old conf, in case of
+    -- plugin configuration update, the closure cannot sense the change.
+    plugin_conf = conf
+
     local log_buffer = buffers[entry.route_id]
 
     if log_buffer then
@@ -209,14 +242,15 @@ function _M.log(conf, ctx)
     -- Generate a function to be executed by the batch processor
     local func = function(entries, batch_max_size)
         local data, err
-        if conf.concat_method == "json" then
+
+        if plugin_conf.concat_method == "json" then
             if batch_max_size == 1 then
                 data, err = core.json.encode(entries[1]) -- encode as single {}
             else
                 data, err = core.json.encode(entries) -- encode as array [{}]
             end
 
-        elseif conf.concat_method == "new_line" then
+        elseif plugin_conf.concat_method == "new_line" then
             if batch_max_size == 1 then
                 data, err = core.json.encode(entries[1]) -- encode as single {}
             else
@@ -233,14 +267,14 @@ function _M.log(conf, ctx)
 
         else
             -- defensive programming check
-            err = "unknown concat_method " .. (conf.concat_method or "nil")
+            err = "unknown concat_method " .. (plugin_conf.concat_method or 
"nil")
         end
 
         if not data then
             return false, 'error occurred while encoding the data: ' .. err
         end
 
-        return send_http_data(conf, data)
+        return send_http_data(plugin_conf, data)
     end
 
     local config = {
diff --git a/apisix/plugins/tcp-logger.lua b/apisix/plugins/tcp-logger.lua
index ced5f8f..cf8dc99 100644
--- a/apisix/plugins/tcp-logger.lua
+++ b/apisix/plugins/tcp-logger.lua
@@ -23,7 +23,7 @@ local buffers = {}
 local ngx = ngx
 local tcp = ngx.socket.tcp
 local ipairs   = ipairs
-local stale_timer_running = false;
+local stale_timer_running = false
 local timer_at = ngx.timer.at
 
 local schema = {
diff --git a/t/plugin/http-logger.t b/t/plugin/http-logger.t
index 0b5619f..d007fbb 100644
--- a/t/plugin/http-logger.t
+++ b/t/plugin/http-logger.t
@@ -637,3 +637,149 @@ GET /t
 done
 --- no_error_log
 [error]
+
+
+=== TEST 17: check plugin configuration updating
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body1 = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "http-logger": {
+                                "uri": "http://127.0.0.1:1982/hello";,
+                                "batch_max_size": 1,
+                                "max_retry_count": 1,
+                                "retry_delay": 2,
+                                "buffer_duration": 2,
+                                "inactive_timeout": 2
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1982": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/opentracing"
+                }]],
+                [[{
+                    "node": {
+                        "value": {
+                            "plugins": {
+                                "http-logger": {
+                                    "uri": "http://127.0.0.1:1982/hello";,
+                                    "batch_max_size": 1,
+                                    "max_retry_count": 1,
+                                    "retry_delay": 2,
+                                    "buffer_duration": 2,
+                                    "inactive_timeout": 2
+                                }
+                            },
+                            "upstream": {
+                                "nodes": {
+                                    "127.0.0.1:1982": 1
+                                },
+                                "type": "roundrobin"
+                            },
+                            "uri": "/opentracing"
+                        },
+                        "key": "/apisix/routes/1"
+                    },
+                    "action": "set"
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            local code, _, body2 = t("/opentracing", "GET")
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            local code, body3 = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "http-logger": {
+                                "uri": "http://127.0.0.1:1982/hello1";,
+                                "batch_max_size": 1,
+                                "max_retry_count": 1,
+                                "retry_delay": 2,
+                                "buffer_duration": 2,
+                                "inactive_timeout": 2
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1982": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/opentracing"
+                }]],
+                [[{
+                    "node": {
+                        "value": {
+                            "plugins": {
+                                "http-logger": {
+                                    "uri": "http://127.0.0.1:1982/hello1";,
+                                    "batch_max_size": 1,
+                                    "max_retry_count": 1,
+                                    "retry_delay": 2,
+                                    "buffer_duration": 2,
+                                    "inactive_timeout": 2
+                                }
+                            },
+                            "upstream": {
+                                "nodes": {
+                                    "127.0.0.1:1982": 1
+                                },
+                                "type": "roundrobin"
+                            },
+                            "uri": "/opentracing"
+                        },
+                        "key": "/apisix/routes/1"
+                    },
+                    "action": "set"
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            local code, _, body4 = t("/opentracing", "GET")
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            ngx.print(body1)
+            ngx.print(body2)
+            ngx.print(body3)
+            ngx.print(body4)
+        }
+    }
+--- request
+GET /t
+--- wait: 0.5
+--- response_body
+passedopentracing
+passedopentracing
+--- grep_error_log eval
+qr/sending a batch logs to http:\/\/127.0.0.1:1982\/hello\d?/
+--- grep_error_log_out
+sending a batch logs to http://127.0.0.1:1982/hello
+sending a batch logs to http://127.0.0.1:1982/hello1

Reply via email to