This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 37efc41 fix: fixed the non-effective plugin configuration update in
tcp/udp-logger plugin (#2928)
37efc41 is described below
commit 37efc410f0d052fff183493f49e43002a392cb44
Author: Alex Zhang <[email protected]>
AuthorDate: Fri Dec 4 23:55:38 2020 +0800
fix: fixed the non-effective plugin configuration update in tcp/udp-logger
plugin (#2928)
---
apisix/plugins/http-logger.lua | 3 +-
apisix/plugins/kafka-logger.lua | 4 +-
apisix/plugins/syslog.lua | 17 ++---
apisix/plugins/tcp-logger.lua | 15 ++---
apisix/plugins/udp-logger.lua | 16 ++---
t/plugin/syslog.t | 135 ++++++++++++++++++++++++++++++++++++++
t/plugin/tcp-logger.t | 139 ++++++++++++++++++++++++++++++++++++++++
t/plugin/udp-logger.t | 139 ++++++++++++++++++++++++++++++++++++++++
8 files changed, 437 insertions(+), 31 deletions(-)
diff --git a/apisix/plugins/http-logger.lua b/apisix/plugins/http-logger.lua
index 4122d1e..9dfab62 100644
--- a/apisix/plugins/http-logger.lua
+++ b/apisix/plugins/http-logger.lua
@@ -180,7 +180,8 @@ local function remove_stale_objects(premature)
for key, batch in ipairs(buffers) do
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0
then
- core.log.warn("removing batch processor stale object, route id:",
tostring(key))
+ core.log.warn("removing batch processor stale object, conf: ",
+ core.json.delay_encode(key))
buffers[key] = nil
end
end
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index d007dbf..766965b 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -25,7 +25,6 @@ local ipairs = ipairs
local plugin_name = "kafka-logger"
local stale_timer_running = false
local timer_at = ngx.timer.at
-local tostring = tostring
local ngx = ngx
local buffers = {}
@@ -109,7 +108,8 @@ local function remove_stale_objects(premature)
for key, batch in ipairs(buffers) do
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0
then
- core.log.debug("removing batch processor stale object, route id:",
tostring(key))
+ core.log.warn("removing batch processor stale object, conf: ",
+ core.json.delay_encode(key))
buffers[key] = nil
end
end
diff --git a/apisix/plugins/syslog.lua b/apisix/plugins/syslog.lua
index 30262d0..904027e 100644
--- a/apisix/plugins/syslog.lua
+++ b/apisix/plugins/syslog.lua
@@ -25,7 +25,6 @@ local buffers = {}
local ipairs = ipairs
local stale_timer_running = false;
local timer_at = ngx.timer.at
-local tostring = tostring
local schema = {
@@ -82,6 +81,8 @@ local function send_syslog_data(conf, log_message, api_ctx)
local err_msg
local res = true
+ core.log.info("sending a batch logs to ", conf.host, ":", conf.port)
+
-- fetch it from lrucache
local logger, err = core.lrucache.plugin_ctx(
lrucache, api_ctx, nil, logger_socket.new, logger_socket, {
@@ -122,7 +123,8 @@ local function remove_stale_objects(premature)
for key, batch in ipairs(buffers) do
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0
then
- core.log.debug("removing batch processor stale object, route id:",
tostring(key))
+ core.log.warn("removing batch processor stale object, conf: ",
+ core.json.delay_encode(key))
buffers[key] = nil
end
end
@@ -135,19 +137,14 @@ end
function _M.log(conf, ctx)
local entry = log_util.get_full_log(ngx, conf)
- if not entry.route_id then
- core.log.error("failed to obtain the route id for sys logger")
- return
- end
-
- local log_buffer = buffers[entry.route_id]
-
if not stale_timer_running then
-- run the timer every 30 mins if any log is present
timer_at(1800, remove_stale_objects)
stale_timer_running = true
end
+ local log_buffer = buffers[conf]
+
if log_buffer then
log_buffer:push(entry)
return
@@ -187,7 +184,7 @@ function _M.log(conf, ctx)
return
end
- buffers[entry.route_id] = log_buffer
+ buffers[conf] = log_buffer
log_buffer:push(entry)
end
diff --git a/apisix/plugins/tcp-logger.lua b/apisix/plugins/tcp-logger.lua
index cf8dc99..78a20d0 100644
--- a/apisix/plugins/tcp-logger.lua
+++ b/apisix/plugins/tcp-logger.lua
@@ -68,6 +68,8 @@ local function send_tcp_data(conf, log_message)
sock:settimeout(conf.timeout)
+ core.log.info("sending a batch logs to ", conf.host, ":", conf.port)
+
local ok, err = sock:connect(conf.host, conf.port)
if not ok then
return false, "failed to connect to TCP server: host[" .. conf.host
@@ -106,7 +108,8 @@ local function remove_stale_objects(premature)
for key, batch in ipairs(buffers) do
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0
then
- core.log.debug("removing batch processor stale object, route id:",
tostring(key))
+ core.log.warn("removing batch processor stale object, conf: ",
+ core.json.delay_encode(key))
buffers[key] = nil
end
end
@@ -118,19 +121,13 @@ end
function _M.log(conf)
local entry = log_util.get_full_log(ngx, conf)
- if not entry.route_id then
- core.log.error("failed to obtain the route id for tcp logger")
- return
- end
-
- local log_buffer = buffers[entry.route_id]
-
if not stale_timer_running then
-- run the timer every 30 mins if any log is present
timer_at(1800, remove_stale_objects)
stale_timer_running = true
end
+ local log_buffer = buffers[conf]
if log_buffer then
log_buffer:push(entry)
return
@@ -169,7 +166,7 @@ function _M.log(conf)
return
end
- buffers[entry.route_id] = log_buffer
+ buffers[conf] = log_buffer
log_buffer:push(entry)
end
diff --git a/apisix/plugins/udp-logger.lua b/apisix/plugins/udp-logger.lua
index cec782a..ea81b5c 100644
--- a/apisix/plugins/udp-logger.lua
+++ b/apisix/plugins/udp-logger.lua
@@ -58,6 +58,9 @@ local function send_udp_data(conf, log_message)
local res = true
local sock = udp()
sock:settimeout(conf.timeout * 1000)
+
+ core.log.info("sending a batch logs to ", conf.host, ":", conf.port)
+
local ok, err = sock:setpeername(conf.host, conf.port)
if not ok then
@@ -89,7 +92,8 @@ local function remove_stale_objects(premature)
for key, batch in ipairs(buffers) do
if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0
then
- core.log.debug("removing batch processor stale object, route id:",
tostring(key))
+ core.log.warn("removing batch processor stale object, conf: ",
+ core.json.delay_encode(key))
buffers[key] = nil
end
end
@@ -101,19 +105,13 @@ end
function _M.log(conf)
local entry = log_util.get_full_log(ngx, conf)
- if not entry.route_id then
- core.log.error("failed to obtain the route id for udp logger")
- return
- end
-
- local log_buffer = buffers[entry.route_id]
-
if not stale_timer_running then
-- run the timer every 30 mins if any log is present
timer_at(1800, remove_stale_objects)
stale_timer_running = true
end
+ local log_buffer = buffers[conf]
if log_buffer then
log_buffer:push(entry)
return
@@ -152,7 +150,7 @@ function _M.log(conf)
return
end
- buffers[entry.route_id] = log_buffer
+ buffers[conf] = log_buffer
log_buffer:push(entry)
end
diff --git a/t/plugin/syslog.t b/t/plugin/syslog.t
index ace3190..ce81234 100644
--- a/t/plugin/syslog.t
+++ b/t/plugin/syslog.t
@@ -267,3 +267,138 @@ hello world
try to lock with key route#1
unlock with key route#1
--- timeout: 5
+
+
+
+=== TEST 8: check plugin configuration updating
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body1 = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "syslog": {
+ "host": "127.0.0.1",
+ "port": 5044,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1982": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/opentracing"
+ }]],
+ [[{
+ "node": {
+ "value": {
+ "plugins": {
+ "syslog": {
+ "host": "127.0.0.1",
+ "port": 5044,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1982": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/opentracing"
+ },
+ "key": "/apisix/routes/1"
+ },
+ "action": "set"
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ local code, _, body2 = t("/opentracing", "GET")
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ local code, body3 = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "syslog": {
+ "host": "127.0.0.1",
+ "port": 5045,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1982": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/opentracing"
+ }]],
+ [[{
+ "node": {
+ "value": {
+ "plugins": {
+ "syslog": {
+ "host": "127.0.0.1",
+ "port": 5045,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1982": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/opentracing"
+ },
+ "key": "/apisix/routes/1"
+ },
+ "action": "set"
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ local code, _, body4 = t("/opentracing", "GET")
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ ngx.print(body1)
+ ngx.print(body2)
+ ngx.print(body3)
+ ngx.print(body4)
+ }
+ }
+--- request
+GET /t
+--- wait: 0.5
+--- response_body
+passedopentracing
+passedopentracing
+--- grep_error_log eval
+qr/sending a batch logs to 127.0.0.1:(\d+)/
+--- grep_error_log_out
+sending a batch logs to 127.0.0.1:5044
+sending a batch logs to 127.0.0.1:5045
diff --git a/t/plugin/tcp-logger.t b/t/plugin/tcp-logger.t
index 05bd3c4..9513864 100644
--- a/t/plugin/tcp-logger.t
+++ b/t/plugin/tcp-logger.t
@@ -234,3 +234,142 @@ GET /t
failed to connect to TCP server: host[312.0.0.1] port[2000]
[error]
--- wait: 3
+
+
+
+=== TEST 7: check plugin configuration updating
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body1 = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "tcp-logger": {
+ "host": "127.0.0.1",
+ "port": 5044,
+ "tls": false,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1982": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/opentracing"
+ }]],
+ [[{
+ "node": {
+ "value": {
+ "plugins": {
+ "tcp-logger": {
+ "host": "127.0.0.1",
+ "port": 5044,
+ "tls": false,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1982": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/opentracing"
+ },
+ "key": "/apisix/routes/1"
+ },
+ "action": "set"
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ local code, _, body2 = t("/opentracing", "GET")
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ local code, body3 = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "tcp-logger": {
+ "host": "127.0.0.1",
+ "port": 5045,
+ "tls": false,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1982": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/opentracing"
+ }]],
+ [[{
+ "node": {
+ "value": {
+ "plugins": {
+ "tcp-logger": {
+ "host": "127.0.0.1",
+ "port": 5045,
+ "tls": false,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1982": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/opentracing"
+ },
+ "key": "/apisix/routes/1"
+ },
+ "action": "set"
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ local code, _, body4 = t("/opentracing", "GET")
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ ngx.print(body1)
+ ngx.print(body2)
+ ngx.print(body3)
+ ngx.print(body4)
+ }
+ }
+--- request
+GET /t
+--- wait: 0.5
+--- response_body
+passedopentracing
+passedopentracing
+--- grep_error_log eval
+qr/sending a batch logs to 127.0.0.1:(\d+)/
+--- grep_error_log_out
+sending a batch logs to 127.0.0.1:5044
+sending a batch logs to 127.0.0.1:5045
diff --git a/t/plugin/udp-logger.t b/t/plugin/udp-logger.t
index 8c08707..aed279b 100644
--- a/t/plugin/udp-logger.t
+++ b/t/plugin/udp-logger.t
@@ -229,3 +229,142 @@ GET /t
failed to connect to UDP server: host[312.0.0.1] port[2000]
[error]
--- wait: 5
+
+
+
+=== TEST 7: check plugin configuration updating
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body1 = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "udp-logger": {
+ "host": "127.0.0.1",
+ "port": 2000,
+ "tls": false,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1982": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/opentracing"
+ }]],
+ [[{
+ "node": {
+ "value": {
+ "plugins": {
+ "udp-logger": {
+ "host": "127.0.0.1",
+ "port": 2000,
+ "tls": false,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1982": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/opentracing"
+ },
+ "key": "/apisix/routes/1"
+ },
+ "action": "set"
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ local code, _, body2 = t("/opentracing", "GET")
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ local code, body3 = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "udp-logger": {
+ "host": "127.0.0.1",
+ "port": 2001,
+ "tls": false,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1982": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/opentracing"
+ }]],
+ [[{
+ "node": {
+ "value": {
+ "plugins": {
+ "udp-logger": {
+ "host": "127.0.0.1",
+ "port": 2001,
+ "tls": false,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1982": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/opentracing"
+ },
+ "key": "/apisix/routes/1"
+ },
+ "action": "set"
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ local code, _, body4 = t("/opentracing", "GET")
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ ngx.print(body1)
+ ngx.print(body2)
+ ngx.print(body3)
+ ngx.print(body4)
+ }
+ }
+--- request
+GET /t
+--- wait: 0.5
+--- response_body
+passedopentracing
+passedopentracing
+--- grep_error_log eval
+qr/sending a batch logs to 127.0.0.1:(\d+)/
+--- grep_error_log_out
+sending a batch logs to 127.0.0.1:2000
+sending a batch logs to 127.0.0.1:2001