This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new c210add feat(batchprocessor): support partial consumption of batch
entries (#6203)
c210add is described below
commit c210add58becdf609baef29961c1c01fd12e91fa
Author: Bisakh <[email protected]>
AuthorDate: Fri Jan 28 13:38:46 2022 +0530
feat(batchprocessor): support partial consumption of batch entries (#6203)
---
apisix/plugins/datadog.lua | 207 +++++++++++++++++++++-----------------
apisix/plugins/loggly.lua | 2 +-
apisix/utils/batch-processor.lua | 29 +++++-
docs/en/latest/batch-processor.md | 7 +-
docs/zh/latest/batch-processor.md | 7 +-
t/utils/batch-processor.t | 43 ++++++++
6 files changed, 191 insertions(+), 104 deletions(-)
diff --git a/apisix/plugins/datadog.lua b/apisix/plugins/datadog.lua
index 154e89c..7f0ed8a 100644
--- a/apisix/plugins/datadog.lua
+++ b/apisix/plugins/datadog.lua
@@ -23,7 +23,6 @@ local ngx = ngx
local udp = ngx.socket.udp
local format = string.format
local concat = table.concat
-local ipairs = ipairs
local tostring = tostring
local plugin_name = "datadog"
@@ -64,6 +63,7 @@ local _M = {
metadata_schema = metadata_schema,
}
+
function _M.check_schema(conf, schema_type)
if schema_type == core.schema.TYPE_METADATA then
return core.schema.check(metadata_schema, conf)
@@ -71,6 +71,7 @@ function _M.check_schema(conf, schema_type)
return core.schema.check(schema, conf)
end
+
local function generate_tag(entry, const_tags)
local tags
if const_tags and #const_tags > 0 then
@@ -108,127 +109,143 @@ local function generate_tag(entry, const_tags)
end
-function _M.log(conf, ctx)
- local entry = fetch_log(ngx, {})
- entry.balancer_ip = ctx.balancer_ip or ""
- entry.scheme = ctx.upstream_scheme or ""
+local function send_metric_over_udp(entry, metadata)
+ local err_msg
+ local sock = udp()
+ local host, port = metadata.value.host, metadata.value.port
- -- if prefer_name is set, fetch the service/route name. If the name is
nil, fall back to id.
- if conf.prefer_name then
- if entry.service_id and entry.service_id ~= "" then
- local svc = service_fetch(entry.service_id)
+ local ok, err = sock:setpeername(host, port)
+ if not ok then
+ return false, "failed to connect to UDP server: host[" .. host
+ .. "] port[" .. tostring(port) .. "] err: " .. err
+ end
- if svc and svc.value.name ~= "" then
- entry.service_id = svc.value.name
- end
- end
+ -- Generate prefix & suffix according dogstatsd udp data format.
+ local suffix = generate_tag(entry, metadata.value.constant_tags)
+ local prefix = metadata.value.namespace
+ if prefix ~= "" then
+ prefix = prefix .. "."
+ end
- if ctx.route_name and ctx.route_name ~= "" then
- entry.route_id = ctx.route_name
- end
+ -- request counter
+ ok, err = sock:send(format("%s:%s|%s%s", prefix .. "request.counter", 1,
"c", suffix))
+ if not ok then
+ err_msg = "error sending request.counter: " .. err
+ core.log.error("failed to report request count to dogstatsd server:
host[" .. host
+ .. "] port[" .. tostring(port) .. "] err: " .. err)
end
- if batch_processor_manager:add_entry(conf, entry) then
- return
+ -- request latency histogram
+ ok, err = sock:send(format("%s:%s|%s%s", prefix .. "request.latency",
+ entry.latency, "h", suffix))
+ if not ok then
+ err_msg = "error sending request.latency: " .. err
+ core.log.error("failed to report request latency to dogstatsd server:
host["
+ .. host .. "] port[" .. tostring(port) .. "] err: " ..
err)
end
- -- Generate a function to be executed by the batch processor
- local func = function(entries, batch_max_size)
- -- Fetching metadata details
- local metadata = plugin.plugin_metadata(plugin_name)
- if not metadata then
- core.log.info("received nil metadata: using metadata defaults: ",
- core.json.delay_encode(defaults, true))
- metadata = {}
- metadata.value = defaults
+ -- upstream latency
+ if entry.upstream_latency then
+ ok, err = sock:send(format("%s:%s|%s%s", prefix .. "upstream.latency",
+ entry.upstream_latency, "h", suffix))
+ if not ok then
+ err_msg = "error sending upstream.latency: " .. err
+ core.log.error("failed to report upstream latency to dogstatsd
server: host["
+ .. host .. "] port[" .. tostring(port) .. "] err: "
.. err)
end
+ end
- -- Creating a udp socket
- local sock = udp()
- local host, port = metadata.value.host, metadata.value.port
- core.log.info("sending batch metrics to dogstatsd: ", host, ":", port)
+ -- apisix_latency
+ ok, err = sock:send(format("%s:%s|%s%s", prefix .. "apisix.latency",
+ entry.apisix_latency, "h", suffix))
+ if not ok then
+ err_msg = "error sending apisix.latency: " .. err
+ core.log.error("failed to report apisix latency to dogstatsd server:
host[" .. host
+ .. "] port[" .. tostring(port) .. "] err: " .. err)
+ end
- local ok, err = sock:setpeername(host, port)
+ -- request body size timer
+ ok, err = sock:send(format("%s:%s|%s%s", prefix .. "ingress.size",
+ entry.request.size, "ms", suffix))
+ if not ok then
+ err_msg = "error sending ingress.size: " .. err
+ core.log.error("failed to report req body size to dogstatsd server:
host[" .. host
+ .. "] port[" .. tostring(port) .. "] err: " .. err)
+ end
- if not ok then
- return false, "failed to connect to UDP server: host[" .. host
- .. "] port[" .. tostring(port) .. "] err: " .. err
- end
+ -- response body size timer
+ ok, err = sock:send(format("%s:%s|%s%s", prefix .. "egress.size",
+ entry.response.size, "ms", suffix))
+ if not ok then
+ err_msg = "error sending egress.size: " .. err
+ core.log.error("failed to report response body size to dogstatsd
server: host["
+ .. host .. "] port[" .. tostring(port) .. "] err: " ..
err)
+ end
- -- Generate prefix & suffix according dogstatsd udp data format.
- local prefix = metadata.value.namespace
- if prefix ~= "" then
- prefix = prefix .. "."
- end
+ ok, err = sock:close()
+ if not ok then
+ core.log.error("failed to close the UDP connection, host[",
+ host, "] port[", port, "] ", err)
+ end
- core.log.info("datadog batch_entry: ", core.json.delay_encode(entries,
true))
- for _, entry in ipairs(entries) do
- local suffix = generate_tag(entry, metadata.value.constant_tags)
+ if not err_msg then
+ return true
+ end
- -- request counter
- local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
- "request.counter", 1, "c", suffix))
- if not ok then
- core.log.error("failed to report request count to dogstatsd
server: host[" .. host
- .. "] port[" .. tostring(port) .. "] err: " .. err)
- end
+ return false, err_msg
+end
- -- request latency histogram
- local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
- "request.latency", entry.latency, "h",
suffix))
- if not ok then
- core.log.error("failed to report request latency to dogstatsd
server: host["
- .. host .. "] port[" .. tostring(port) .. "] err: " ..
err)
- end
+local function push_metrics(entries)
+ -- Fetching metadata details
+ local metadata = plugin.plugin_metadata(plugin_name)
+ core.log.info("metadata: ", core.json.delay_encode(metadata))
- -- upstream latency
- if entry.upstream_latency then
- local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
- "upstream.latency", entry.upstream_latency,
"h", suffix))
- if not ok then
- core.log.error("failed to report upstream latency to
dogstatsd server: host["
- .. host .. "] port[" .. tostring(port) .. "]
err: " .. err)
- end
- end
+ if not metadata then
+ core.log.info("received nil metadata: using metadata defaults: ",
+ core.json.delay_encode(defaults, true))
+ metadata = {}
+ metadata.value = defaults
+ end
+ core.log.info("sending batch metrics to dogstatsd: ", metadata.value.host,
+ ":", metadata.value.port)
- -- apisix_latency
- local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
- "apisix.latency", entry.apisix_latency,
"h", suffix))
- if not ok then
- core.log.error("failed to report apisix latency to dogstatsd
server: host[" .. host
- .. "] port[" .. tostring(port) .. "] err: " .. err)
- end
+ for i = 1, #entries do
+ local ok, err = send_metric_over_udp(entries[i], metadata)
+ if not ok then
+ return false, err, i
+ end
+ end
+
+ return true
+end
- -- request body size timer
- local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
- "ingress.size",
entry.request.size, "ms", suffix))
- if not ok then
- core.log.error("failed to report req body size to dogstatsd
server: host[" .. host
- .. "] port[" .. tostring(port) .. "] err: " .. err)
- end
- -- response body size timer
- local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
- "egress.size",
entry.response.size, "ms", suffix))
- if not ok then
- core.log.error("failed to report response body size to
dogstatsd server: host["
- .. host .. "] port[" .. tostring(port) .. "] err: " ..
err)
+function _M.log(conf, ctx)
+ local entry = fetch_log(ngx, {})
+ entry.balancer_ip = ctx.balancer_ip or ""
+ entry.scheme = ctx.upstream_scheme or ""
+
+ -- if prefer_name is set, fetch the service/route name. If the name is
nil, fall back to id.
+ if conf.prefer_name then
+ if entry.service_id and entry.service_id ~= "" then
+ local svc = service_fetch(entry.service_id)
+
+ if svc and svc.value.name ~= "" then
+ entry.service_id = svc.value.name
end
end
- -- Releasing the UDP socket descriptor
- ok, err = sock:close()
- if not ok then
- core.log.error("failed to close the UDP connection, host[",
- host, "] port[", port, "] ", err)
+ if ctx.route_name and ctx.route_name ~= "" then
+ entry.route_id = ctx.route_name
end
+ end
- -- Returning at the end and ensuring the resource has been released.
- return true
+ if batch_processor_manager:add_entry(conf, entry) then
+ return
end
- batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
+ batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx,
push_metrics)
end
return _M
diff --git a/apisix/plugins/loggly.lua b/apisix/plugins/loggly.lua
index 5bd35fe..d182888 100644
--- a/apisix/plugins/loggly.lua
+++ b/apisix/plugins/loggly.lua
@@ -292,7 +292,7 @@ local function handle_log(entries)
for i = 1, #entries do
local ok, err = send_data_over_udp(entries[i], metadata)
if not ok then
- return false, err
+ return false, err, i
end
end
else
diff --git a/apisix/utils/batch-processor.lua b/apisix/utils/batch-processor.lua
index 8ba4813..dcd3d59 100644
--- a/apisix/utils/batch-processor.lua
+++ b/apisix/utils/batch-processor.lua
@@ -63,17 +63,38 @@ local function set_metrics(self, count)
end
+local function slice_batch(batch, n)
+ local slice = {}
+ local idx = 1
+ for i = n or 1, #batch do
+ slice[idx] = batch[i]
+ idx = idx + 1
+ end
+ return slice
+end
+
+
function execute_func(premature, self, batch)
if premature then
return
end
- local ok, err = self.func(batch.entries, self.batch_max_size)
+ -- In case of "err" and a valid "first_fail" batch processor considers,
all first_fail-1
+ -- entries have been successfully consumed and hence reschedule the job
for entries with
+ -- index first_fail to #entries based on the current retry policy.
+ local ok, err, first_fail = self.func(batch.entries, self.batch_max_size)
if not ok then
- core.log.error("Batch Processor[", self.name,
- "] failed to process entries: ", err)
+ if first_fail then
+ core.log.error("Batch Processor[", self.name, "] failed to process
entries [",
+ #batch.entries + 1 - first_fail, "/",
#batch.entries ,"]: ", err)
+ batch.entries = slice_batch(batch.entries, first_fail)
+ else
+ core.log.error("Batch Processor[", self.name,
+ "] failed to process entries: ", err)
+ end
+
batch.retry_count = batch.retry_count + 1
- if batch.retry_count <= self.max_retry_count then
+ if batch.retry_count <= self.max_retry_count and #batch.entries > 0
then
schedule_func_exec(self, self.retry_delay,
batch)
else
diff --git a/docs/en/latest/batch-processor.md
b/docs/en/latest/batch-processor.md
index 08ec5dc..6f18cf3 100644
--- a/docs/en/latest/batch-processor.md
+++ b/docs/en/latest/batch-processor.md
@@ -70,7 +70,10 @@ function _M.log(conf, ctx)
-- serialize to json array core.json.encode(entries)
-- process/send data
return true
- -- return false, err_msg if failed
+ -- return false, err_msg, first_fail if failed
+ -- first_fail(optional) indicates first_fail-1 entries have been
successfully processed
+ -- and during processing of entries[first_fail], the error occurred.
So the batch processor
+ -- only retries for the entries having index >= first_fail as per the
retry policy.
end
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
end
@@ -120,7 +123,7 @@ local err
local func = function(entries)
...
return true
- -- return false, err_msg if failed
+ -- return false, err_msg, first_fail if failed
end
log_buffer, err = batch_processor:new(func, config_bat)
diff --git a/docs/zh/latest/batch-processor.md
b/docs/zh/latest/batch-processor.md
index 78f755c..3096a1b 100644
--- a/docs/zh/latest/batch-processor.md
+++ b/docs/zh/latest/batch-processor.md
@@ -68,7 +68,10 @@ function _M.log(conf, ctx)
-- serialize to json array core.json.encode(entries)
-- process/send data
return true
- -- return false, err_msg if failed
+ -- return false, err_msg, first_fail if failed
+ -- first_fail(optional) indicates first_fail-1 entries have been
successfully processed
+ -- and during processing of entries[first_fail], the error occurred.
So the batch processor
+ -- only retries for the entries having index >= first_fail as per the
retry policy.
end
batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
end
@@ -118,7 +121,7 @@ local err
local func = function(entries)
...
return true
- -- return false, err_msg if failed
+ -- return false, err_msg, first_fail if failed
end
log_buffer, err = batch_processor:new(func, config_bat)
diff --git a/t/utils/batch-processor.t b/t/utils/batch-processor.t
index 15cf5cc..e1ce83b 100644
--- a/t/utils/batch-processor.t
+++ b/t/utils/batch-processor.t
@@ -438,3 +438,46 @@ Batch Processor[log buffer] activating flush due to no
activity
--- error_log
Batch Processor[log buffer] extending buffer timer
--- wait: 3
+
+
+
+=== TEST 12: partially consumed entries
+--- config
+ location /t {
+ content_by_lua_block {
+ local Batch = require("apisix.utils.batch-processor")
+ local core = require("apisix.core")
+ local config = {
+ max_retry_count = 1,
+ batch_max_size = 3,
+ retry_delay = 0,
+ inactive_timeout = 1
+ }
+ local func_to_send = function(elements)
+ core.log.info(require("toolkit.json").encode(elements))
+ return false, "error after consuming single entry", 2
+ end
+ local log_buffer, err = Batch:new(func_to_send, config)
+
+ if not log_buffer then
+ ngx.say(err)
+ end
+
+ log_buffer:push({msg='1'})
+ log_buffer:push({msg='2'})
+ log_buffer:push({msg='3'})
+ log_buffer:push({msg='4'})
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+[{"msg":"1"},{"msg":"2"},{"msg":"3"}]
+Batch Processor[log buffer] failed to process entries [2/3]: error after
consuming single entry
+[{"msg":"2"},{"msg":"3"}]
+Batch Processor[log buffer] failed to process entries [1/2]: error after
consuming single entry
+[{"msg":"4"}]
+--- wait: 2