This is an automated email from the ASF dual-hosted git repository.

wenming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 25e1101  featue: Updating the UDP logger to use the batch processor 
util (#1355)
25e1101 is described below

commit 25e110135a7183a74de2270bcea32b78dfe195fc
Author: Nirojan Selvanathan <[email protected]>
AuthorDate: Fri Apr 3 10:33:32 2020 +0200

    featue: Updating the UDP logger to use the batch processor util (#1355)
---
 apisix/plugins/udp-logger.lua | 92 +++++++++++++++++++++++++++++++++----------
 doc/plugins/udp-logger.md     | 25 +++++++-----
 t/plugin/udp-logger.t         | 14 ++++---
 3 files changed, 95 insertions(+), 36 deletions(-)

diff --git a/apisix/plugins/udp-logger.lua b/apisix/plugins/udp-logger.lua
index d28c5db..461767b 100644
--- a/apisix/plugins/udp-logger.lua
+++ b/apisix/plugins/udp-logger.lua
@@ -14,12 +14,13 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 --
-local core     = require("apisix.core")
+local core = require("apisix.core")
 local log_util = require("apisix.utils.log-util")
+local batch_processor = require("apisix.utils.batch-processor")
 local plugin_name = "udp-logger"
+local tostring = tostring
+local buffers = {}
 local ngx = ngx
-
-local timer_at = ngx.timer.at
 local udp = ngx.socket.udp
 
 local schema = {
@@ -27,9 +28,11 @@ local schema = {
     properties = {
         host = {type = "string"},
         port = {type = "integer", minimum = 0},
-        timeout = {   -- timeout in milliseconds
-            type = "integer", minimum = 1, default= 1000
-        }
+        timeout = {type = "integer", minimum = 1, default = 3},
+        name = {type = "string", default = "udp logger"},
+        buffer_duration = {type = "integer", minimum = 1, default = 60},
+        inactive_timeout = {type = "integer", minimum = 1, default = 5},
+        batch_max_size = {type = "integer", minimum = 1, default = 1000},
     },
     required = {"host", "port"}
 }
@@ -46,38 +49,85 @@ function _M.check_schema(conf)
     return core.schema.check(schema, conf)
 end
 
-local function log(premature, conf, log_message)
-    if premature then
-        return
-    end
-
+local function send_udp_data(conf, log_message)
+    local err_msg
+    local res = true
     local sock = udp()
-    sock:settimeout(conf.timeout)
-
+    sock:settimeout(conf.timeout * 1000)
     local ok, err = sock:setpeername(conf.host, conf.port)
+
     if not ok then
-        core.log.error("failed to connect to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
-        return
+        return nil, "failed to connect to UDP server: host[" .. conf.host
+                    .. "] port[" .. tostring(conf.port) .. "] err: " .. err
     end
 
     ok, err = sock:send(log_message)
     if not ok then
-        core.log.error("failed to send data to UDP server: host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+        res = false
+        err_msg = "failed to send data to UDP server: host[" .. conf.host
+                  .. "] port[" .. tostring(conf.port) .. "] err:" .. err
     end
 
     ok, err = sock:close()
     if not ok then
         core.log.error("failed to close the UDP connection, host[",
-                       conf.host, "] port[", conf.port, "] ", err)
+                        conf.host, "] port[", conf.port, "] ", err)
     end
+
+    return res, err_msg
 end
 
 
 function _M.log(conf)
-    return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx)))
+    local entry = log_util.get_full_log(ngx)
+
+    if not entry.route_id then
+        core.log.error("failed to obtain the route id for tcp logger")
+        return
+    end
+
+    local log_buffer = buffers[entry.route_id]
+
+    if log_buffer then
+        log_buffer:push(entry)
+        return
+    end
+
+    -- Generate a function to be executed by the batch processor
+    local func = function(entries, batch_max_size)
+        local data, err
+        if batch_max_size == 1 then
+            data, err = core.json.encode(entries[1]) -- encode as single {}
+        else
+            data, err = core.json.encode(entries) -- encode as array [{}]
+        end
+
+        if not data then
+            return false, 'error occurred while encoding the data: ' .. err
+        end
+
+        return send_udp_data(conf, data)
+    end
+
+    local config = {
+        name = conf.name,
+        retry_delay = conf.retry_delay,
+        batch_max_size = conf.batch_max_size,
+        max_retry_count = conf.max_retry_count,
+        buffer_duration = conf.buffer_duration,
+        inactive_timeout = conf.inactive_timeout,
+    }
+
+    local err
+    log_buffer, err = batch_processor:new(func, config)
+
+    if not log_buffer then
+        core.log.error("error when creating the batch processor: ", err)
+        return
+    end
+
+    buffers[entry.route_id] = log_buffer
+    log_buffer:push(entry)
 end
 
 return _M
-
diff --git a/doc/plugins/udp-logger.md b/doc/plugins/udp-logger.md
index a2753d8..e6e7149 100644
--- a/doc/plugins/udp-logger.md
+++ b/doc/plugins/udp-logger.md
@@ -33,15 +33,19 @@ This will provide the ability to send Log data requests as 
JSON objects to Monit
 
 ## Attributes
 
-|Name          |Requirement  |Description|
-|---------     |--------|-----------|
-| host |required| IP address or the Hostname of the UDP server.|
-| port |required| Target upstream port.|
-| timeout |optional|Timeout for the upstream to send data.|
+|Name           |Requirement    |Description|
+|---------      |--------       |-----------|
+|host           |required       | IP address or the Hostname of the UDP 
server.|
+|port           |required       | Target upstream port.|
+|timeout        |optional       |Timeout for the upstream to send data.|
+|name           |optional       |A unique identifier to identity the batch 
processor|
+|batch_max_size |optional       |Max size of each batch, default is 1000|
+|inactive_timeout|optional      |Maximum age in seconds when the buffer will 
be flushed if inactive, default is 5s|
+|buffer_duration|optional       |Maximum age in seconds of the oldest entry in 
a batch before the batch must be processed, default is 60|
 
 ## How To Enable
 
-1. Here is an examle on how to enable udp-logger plugin for a specific route.
+The following is an example on how to enable the udp-logger for a specific 
route.
 
 ```shell
 curl http://127.0.0.1:9080/apisix/admin/consumers -H 'X-API-KEY: 
edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
@@ -51,7 +55,9 @@ curl http://127.0.0.1:9080/apisix/admin/consumers -H 
'X-API-KEY: edd1c9f034335f1
           "plugins": {
                 "udp-logger": {
                      "host": "127.0.0.1",
-                     "port": 3000
+                     "port": 3000,
+                     "batch_max_size": 1,
+                     "name": "udp logger"
                 }
            },
           "upstream": {
@@ -78,9 +84,8 @@ hello, world
 
 ## Disable Plugin
 
-When you want to disable the `udp-logger` plugin, it is very simple,
- you can delete the corresponding json configuration in the plugin 
configuration,
-  no need to restart the service, it will take effect immediately:
+Remove the corresponding json configuration in the plugin configuration to 
disable the `udp-logger`.
+APISIX plugins are hot-reloaded, therefore no need to restart APISIX.
 
 ```shell
 $ curl http://127.0.0.1:2379/apisix/admin/routes/1 -X PUT -d value='
diff --git a/t/plugin/udp-logger.t b/t/plugin/udp-logger.t
index e30066c..3112680 100644
--- a/t/plugin/udp-logger.t
+++ b/t/plugin/udp-logger.t
@@ -102,7 +102,8 @@ done
                         "plugins": {
                             "udp-logger": {
                                 "host": "127.0.0.1",
-                                "port": 2000
+                                "port": 2000,
+                                "batch_max_size": 1
                             }
                         },
                         "upstream": {
@@ -119,7 +120,8 @@ done
                             "plugins": {
                                 "udp-logger": {
                                     "host": "127.0.0.1",
-                                    "port": 2000
+                                    "port": 2000,
+                                    "batch_max_size": 1
                                 }
                             },
                             "upstream": {
@@ -173,7 +175,8 @@ opentracing
                         "plugins": {
                             "udp-logger": {
                                 "host": "312.0.0.1",
-                                "port": 2000
+                                "port": 2000,
+                                "batch_max_size": 1
                             }
                         },
                         "upstream": {
@@ -190,7 +193,8 @@ opentracing
                             "plugins": {
                                 "udp-logger": {
                                     "host": "312.0.0.1",
-                                    "port": 2000
+                                    "port": 2000,
+                                    "batch_max_size": 1
                                 }
                             },
                             "upstream": {
@@ -224,4 +228,4 @@ GET /t
 --- error_log
 failed to connect to UDP server: host[312.0.0.1] port[2000]
 [error]
---- wait: 0.2
+--- wait: 2

Reply via email to