This is an automated email from the ASF dual-hosted git repository. tokers pushed a commit to branch fix/batch-processor in repository https://gitbox.apache.org/repos/asf/apisix.git
commit add5906f87c9131213ef6221ceac1f9595ac3bea Author: Alex Zhang <[email protected]> AuthorDate: Mon Nov 30 20:06:20 2020 +0800 fix: fixed the non effective config update in http-logger --- apisix/plugins/http-logger.lua | 46 +++++++++++-- apisix/plugins/tcp-logger.lua | 2 +- t/plugin/http-logger.t | 146 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 187 insertions(+), 7 deletions(-) diff --git a/apisix/plugins/http-logger.lua b/apisix/plugins/http-logger.lua index 7a79102..1b2de40 100644 --- a/apisix/plugins/http-logger.lua +++ b/apisix/plugins/http-logger.lua @@ -21,19 +21,22 @@ local core = require("apisix.core") local http = require("resty.http") local url = require("net.url") local plugin = require("apisix.plugin") + local ngx = ngx local tostring = tostring local pairs = pairs -local ipairs = ipairs +local ipairs = ipairs local str_byte = string.byte - +local timer_at = ngx.timer.at local plugin_name = "http-logger" +local stale_timer_running = false local buffers = {} local lru_log_format = core.lrucache.new({ ttl = 300, count = 512 }) +local plugin_conf local schema = { type = "object", @@ -92,6 +95,8 @@ local function send_http_data(conf, log_message) local host = url_decoded.host local port = url_decoded.port + core.log.info("sending a batch logs to ", conf.uri) + if ((not port) and url_decoded.scheme == "https") then port = 443 elseif not port then @@ -169,6 +174,23 @@ local function gen_log_format(metadata) end +-- remove stale objects from the memory after timer expires +local function remove_stale_objects(premature) + if premature then + return + end + + for key, batch in ipairs(buffers) do + if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then + core.log.debug("removing batch processor stale object, route id:", tostring(key)) + buffers[key] = nil + end + end + + stale_timer_running = false +end + + function _M.log(conf, ctx) local metadata = plugin.plugin_metadata(plugin_name) core.log.info("metadata: ", core.json.delay_encode(metadata)) @@ -199,6 +221,17 @@ function _M.log(conf, ctx) entry.route_id = "no-matched" end + if not stale_timer_running then + -- run the timer every 30 mins if any log is present + timer_at(1800, remove_stale_objects) + stale_timer_running = true + end + + -- always cache the latest plugin conf in the module to avoid the + -- closure (method `func`) references to the old conf, in case of + -- plugin configuration update, the closure cannot sense the change. + plugin_conf = conf + local log_buffer = buffers[entry.route_id] if log_buffer then @@ -209,14 +242,15 @@ function _M.log(conf, ctx) -- Generate a function to be executed by the batch processor local func = function(entries, batch_max_size) local data, err - if conf.concat_method == "json" then + + if plugin_conf.concat_method == "json" then if batch_max_size == 1 then data, err = core.json.encode(entries[1]) -- encode as single {} else data, err = core.json.encode(entries) -- encode as array [{}] end - elseif conf.concat_method == "new_line" then + elseif plugin_conf.concat_method == "new_line" then if batch_max_size == 1 then data, err = core.json.encode(entries[1]) -- encode as single {} else @@ -233,14 +267,14 @@ function _M.log(conf, ctx) else -- defensive programming check - err = "unknown concat_method " .. (conf.concat_method or "nil") + err = "unknown concat_method " .. (plugin_conf.concat_method or "nil") end if not data then return false, 'error occurred while encoding the data: ' .. err end - return send_http_data(conf, data) + return send_http_data(plugin_conf, data) end local config = { diff --git a/apisix/plugins/tcp-logger.lua b/apisix/plugins/tcp-logger.lua index ced5f8f..cf8dc99 100644 --- a/apisix/plugins/tcp-logger.lua +++ b/apisix/plugins/tcp-logger.lua @@ -23,7 +23,7 @@ local buffers = {} local ngx = ngx local tcp = ngx.socket.tcp local ipairs = ipairs -local stale_timer_running = false; +local stale_timer_running = false local timer_at = ngx.timer.at local schema = { diff --git a/t/plugin/http-logger.t b/t/plugin/http-logger.t index 0b5619f..d007fbb 100644 --- a/t/plugin/http-logger.t +++ b/t/plugin/http-logger.t @@ -637,3 +637,149 @@ GET /t done --- no_error_log [error] + + +=== TEST 17: check plugin configuration updating +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body1 = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "http-logger": { + "uri": "http://127.0.0.1:1982/hello", + "batch_max_size": 1, + "max_retry_count": 1, + "retry_delay": 2, + "buffer_duration": 2, + "inactive_timeout": 2 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1982": 1 + }, + "type": "roundrobin" + }, + "uri": "/opentracing" + }]], + [[{ + "node": { + "value": { + "plugins": { + "http-logger": { + "uri": "http://127.0.0.1:1982/hello", + "batch_max_size": 1, + "max_retry_count": 1, + "retry_delay": 2, + "buffer_duration": 2, + "inactive_timeout": 2 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1982": 1 + }, + "type": "roundrobin" + }, + "uri": "/opentracing" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + local code, _, body2 = t("/opentracing", "GET") + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + local code, body3 = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "http-logger": { + "uri": "http://127.0.0.1:1982/hello1", + "batch_max_size": 1, + "max_retry_count": 1, + "retry_delay": 2, + "buffer_duration": 2, + "inactive_timeout": 2 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1982": 1 + }, + "type": "roundrobin" + }, + "uri": "/opentracing" + }]], + [[{ + "node": { + "value": { + "plugins": { + "http-logger": { + "uri": "http://127.0.0.1:1982/hello1", + "batch_max_size": 1, + "max_retry_count": 1, + "retry_delay": 2, + "buffer_duration": 2, + "inactive_timeout": 2 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1982": 1 + }, + "type": "roundrobin" + }, + "uri": "/opentracing" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + local code, _, body4 = t("/opentracing", "GET") + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + ngx.print(body1) + ngx.print(body2) + ngx.print(body3) + ngx.print(body4) + } + } +--- request +GET /t +--- wait: 0.5 +--- response_body +passedopentracing +passedopentracing +--- grep_error_log eval +qr/sending a batch logs to http:\/\/127.0.0.1:1982\/hello\d?/ +--- grep_error_log_out +sending a batch logs to http://127.0.0.1:1982/hello +sending a batch logs to http://127.0.0.1:1982/hello1
