This is an automated email from the ASF dual-hosted git repository.
wenming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 25e1101 featue: Updating the UDP logger to use the batch processor
util (#1355)
25e1101 is described below
commit 25e110135a7183a74de2270bcea32b78dfe195fc
Author: Nirojan Selvanathan <[email protected]>
AuthorDate: Fri Apr 3 10:33:32 2020 +0200
featue: Updating the UDP logger to use the batch processor util (#1355)
---
apisix/plugins/udp-logger.lua | 92 +++++++++++++++++++++++++++++++++----------
doc/plugins/udp-logger.md | 25 +++++++-----
t/plugin/udp-logger.t | 14 ++++---
3 files changed, 95 insertions(+), 36 deletions(-)
diff --git a/apisix/plugins/udp-logger.lua b/apisix/plugins/udp-logger.lua
index d28c5db..461767b 100644
--- a/apisix/plugins/udp-logger.lua
+++ b/apisix/plugins/udp-logger.lua
@@ -14,12 +14,13 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
-local core = require("apisix.core")
+local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
+local batch_processor = require("apisix.utils.batch-processor")
local plugin_name = "udp-logger"
+local tostring = tostring
+local buffers = {}
local ngx = ngx
-
-local timer_at = ngx.timer.at
local udp = ngx.socket.udp
local schema = {
@@ -27,9 +28,11 @@ local schema = {
properties = {
host = {type = "string"},
port = {type = "integer", minimum = 0},
- timeout = { -- timeout in milliseconds
- type = "integer", minimum = 1, default= 1000
- }
+ timeout = {type = "integer", minimum = 1, default = 3},
+ name = {type = "string", default = "udp logger"},
+ buffer_duration = {type = "integer", minimum = 1, default = 60},
+ inactive_timeout = {type = "integer", minimum = 1, default = 5},
+ batch_max_size = {type = "integer", minimum = 1, default = 1000},
},
required = {"host", "port"}
}
@@ -46,38 +49,85 @@ function _M.check_schema(conf)
return core.schema.check(schema, conf)
end
-local function log(premature, conf, log_message)
- if premature then
- return
- end
-
+local function send_udp_data(conf, log_message)
+ local err_msg
+ local res = true
local sock = udp()
- sock:settimeout(conf.timeout)
-
+ sock:settimeout(conf.timeout * 1000)
local ok, err = sock:setpeername(conf.host, conf.port)
+
if not ok then
- core.log.error("failed to connect to UDP server: host[",
- conf.host, "] port[", conf.port, "] ", err)
- return
+ return nil, "failed to connect to UDP server: host[" .. conf.host
+ .. "] port[" .. tostring(conf.port) .. "] err: " .. err
end
ok, err = sock:send(log_message)
if not ok then
- core.log.error("failed to send data to UDP server: host[",
- conf.host, "] port[", conf.port, "] ", err)
+ res = false
+ err_msg = "failed to send data to UDP server: host[" .. conf.host
+ .. "] port[" .. tostring(conf.port) .. "] err:" .. err
end
ok, err = sock:close()
if not ok then
core.log.error("failed to close the UDP connection, host[",
- conf.host, "] port[", conf.port, "] ", err)
+ conf.host, "] port[", conf.port, "] ", err)
end
+
+ return res, err_msg
end
function _M.log(conf)
- return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx)))
+ local entry = log_util.get_full_log(ngx)
+
+ if not entry.route_id then
+ core.log.error("failed to obtain the route id for tcp logger")
+ return
+ end
+
+ local log_buffer = buffers[entry.route_id]
+
+ if log_buffer then
+ log_buffer:push(entry)
+ return
+ end
+
+ -- Generate a function to be executed by the batch processor
+ local func = function(entries, batch_max_size)
+ local data, err
+ if batch_max_size == 1 then
+ data, err = core.json.encode(entries[1]) -- encode as single {}
+ else
+ data, err = core.json.encode(entries) -- encode as array [{}]
+ end
+
+ if not data then
+ return false, 'error occurred while encoding the data: ' .. err
+ end
+
+ return send_udp_data(conf, data)
+ end
+
+ local config = {
+ name = conf.name,
+ retry_delay = conf.retry_delay,
+ batch_max_size = conf.batch_max_size,
+ max_retry_count = conf.max_retry_count,
+ buffer_duration = conf.buffer_duration,
+ inactive_timeout = conf.inactive_timeout,
+ }
+
+ local err
+ log_buffer, err = batch_processor:new(func, config)
+
+ if not log_buffer then
+ core.log.error("error when creating the batch processor: ", err)
+ return
+ end
+
+ buffers[entry.route_id] = log_buffer
+ log_buffer:push(entry)
end
return _M
-
diff --git a/doc/plugins/udp-logger.md b/doc/plugins/udp-logger.md
index a2753d8..e6e7149 100644
--- a/doc/plugins/udp-logger.md
+++ b/doc/plugins/udp-logger.md
@@ -33,15 +33,19 @@ This will provide the ability to send Log data requests as
JSON objects to Monit
## Attributes
-|Name |Requirement |Description|
-|--------- |--------|-----------|
-| host |required| IP address or the Hostname of the UDP server.|
-| port |required| Target upstream port.|
-| timeout |optional|Timeout for the upstream to send data.|
+|Name |Requirement |Description|
+|--------- |-------- |-----------|
+|host |required | IP address or the Hostname of the UDP
server.|
+|port |required | Target upstream port.|
+|timeout |optional |Timeout for the upstream to send data.|
+|name |optional |A unique identifier to identity the batch
processor|
+|batch_max_size |optional |Max size of each batch, default is 1000|
+|inactive_timeout|optional |Maximum age in seconds when the buffer will
be flushed if inactive, default is 5s|
+|buffer_duration|optional |Maximum age in seconds of the oldest entry in
a batch before the batch must be processed, default is 60|
## How To Enable
-1. Here is an examle on how to enable udp-logger plugin for a specific route.
+The following is an example on how to enable the udp-logger for a specific
route.
```shell
curl http://127.0.0.1:9080/apisix/admin/consumers -H 'X-API-KEY:
edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
@@ -51,7 +55,9 @@ curl http://127.0.0.1:9080/apisix/admin/consumers -H
'X-API-KEY: edd1c9f034335f1
"plugins": {
"udp-logger": {
"host": "127.0.0.1",
- "port": 3000
+ "port": 3000,
+ "batch_max_size": 1,
+ "name": "udp logger"
}
},
"upstream": {
@@ -78,9 +84,8 @@ hello, world
## Disable Plugin
-When you want to disable the `udp-logger` plugin, it is very simple,
- you can delete the corresponding json configuration in the plugin
configuration,
- no need to restart the service, it will take effect immediately:
+Remove the corresponding json configuration in the plugin configuration to
disable the `udp-logger`.
+APISIX plugins are hot-reloaded, therefore no need to restart APISIX.
```shell
$ curl http://127.0.0.1:2379/apisix/admin/routes/1 -X PUT -d value='
diff --git a/t/plugin/udp-logger.t b/t/plugin/udp-logger.t
index e30066c..3112680 100644
--- a/t/plugin/udp-logger.t
+++ b/t/plugin/udp-logger.t
@@ -102,7 +102,8 @@ done
"plugins": {
"udp-logger": {
"host": "127.0.0.1",
- "port": 2000
+ "port": 2000,
+ "batch_max_size": 1
}
},
"upstream": {
@@ -119,7 +120,8 @@ done
"plugins": {
"udp-logger": {
"host": "127.0.0.1",
- "port": 2000
+ "port": 2000,
+ "batch_max_size": 1
}
},
"upstream": {
@@ -173,7 +175,8 @@ opentracing
"plugins": {
"udp-logger": {
"host": "312.0.0.1",
- "port": 2000
+ "port": 2000,
+ "batch_max_size": 1
}
},
"upstream": {
@@ -190,7 +193,8 @@ opentracing
"plugins": {
"udp-logger": {
"host": "312.0.0.1",
- "port": 2000
+ "port": 2000,
+ "batch_max_size": 1
}
},
"upstream": {
@@ -224,4 +228,4 @@ GET /t
--- error_log
failed to connect to UDP server: host[312.0.0.1] port[2000]
[error]
---- wait: 0.2
+--- wait: 2