membphis commented on a change in pull request #1355: Updating the UDP logger to use the batch processor util URL: https://github.com/apache/incubator-apisix/pull/1355#discussion_r401362090
########## File path: lua/apisix/plugins/udp-logger.lua ########## @@ -46,38 +51,85 @@ function _M.check_schema(conf) return core.schema.check(schema, conf) end -local function log(premature, conf, log_message) - if premature then - return - end - +local function send_udp_data(conf, log_message) + local err_msg + local res = true local sock = udp() - sock:settimeout(conf.timeout) - + sock:settimeout(conf.timeout * 1000) local ok, err = sock:setpeername(conf.host, conf.port) + if not ok then - core.log.error("failed to connect to UDP server: host[", - conf.host, "] port[", conf.port, "] ", err) - return + return nil, "failed to connect to UDP server: host[" .. conf.host + .. "] port[" .. tostring(conf.port) .. "] err: " .. err end ok, err = sock:send(log_message) if not ok then - core.log.error("failed to send data to UDP server: host[", - conf.host, "] port[", conf.port, "] ", err) + res = false + err_msg = "failed to send data to UDP server: host[" .. conf.host + .. "] port[" .. tostring(conf.port) .. "] err:" .. err end ok, err = sock:close() if not ok then core.log.error("failed to close the UDP connection, host[", - conf.host, "] port[", conf.port, "] ", err) + conf.host, "] port[", conf.port, "] ", err) end + + return res, err_msg end function _M.log(conf) - return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx))) + local entry = log_util.get_full_log(ngx) + + if not entry.route_id then + core.log.error("failed to obtain the route id for tcp logger") + return + end + + local log_buffer = buffers[entry.route_id] + + if log_buffer then + log_buffer:push(entry) + return + end + + -- Generate a function to be executed by the batch processor + local func = function(entries, batch_max_size) + local data, err + if batch_max_size == 1 then + data, err = core.json.encode(entries[1]) -- encode as single {} + else + data, err = core.json.encode(entries) -- encode as array [{}] + end + + if not data then + core.log.error('error occurred while encoding the token: ', err) + end + + return send_udp_data(conf, data) + end + + local config = { + name = conf.name, + retry_delay = conf.retry_delay, + batch_max_size = conf.batch_max_size, + max_retry_count = conf.max_retry_count, + buffer_duration = conf.buffer_duration, + inactive_timeout = conf.inactive_timeout, + } + + local err + log_buffer, err = batch_processor:new(func, config) + + if not log_buffer then + core.log.error("error when creating the batch processor: ", err) + return + end + + buffers[entry.route_id] = log_buffer Review comment: We need a way to control the number of objects in `buffers` to avoid memory overflow. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services