This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new c210add  feat(batchprocessor): support partial consumption of batch 
entries (#6203)
c210add is described below

commit c210add58becdf609baef29961c1c01fd12e91fa
Author: Bisakh <[email protected]>
AuthorDate: Fri Jan 28 13:38:46 2022 +0530

    feat(batchprocessor): support partial consumption of batch entries (#6203)
---
 apisix/plugins/datadog.lua        | 207 +++++++++++++++++++++-----------------
 apisix/plugins/loggly.lua         |   2 +-
 apisix/utils/batch-processor.lua  |  29 +++++-
 docs/en/latest/batch-processor.md |   7 +-
 docs/zh/latest/batch-processor.md |   7 +-
 t/utils/batch-processor.t         |  43 ++++++++
 6 files changed, 191 insertions(+), 104 deletions(-)

diff --git a/apisix/plugins/datadog.lua b/apisix/plugins/datadog.lua
index 154e89c..7f0ed8a 100644
--- a/apisix/plugins/datadog.lua
+++ b/apisix/plugins/datadog.lua
@@ -23,7 +23,6 @@ local ngx = ngx
 local udp = ngx.socket.udp
 local format = string.format
 local concat = table.concat
-local ipairs = ipairs
 local tostring = tostring
 
 local plugin_name = "datadog"
@@ -64,6 +63,7 @@ local _M = {
     metadata_schema = metadata_schema,
 }
 
+
 function _M.check_schema(conf, schema_type)
     if schema_type == core.schema.TYPE_METADATA then
         return core.schema.check(metadata_schema, conf)
@@ -71,6 +71,7 @@ function _M.check_schema(conf, schema_type)
     return core.schema.check(schema, conf)
 end
 
+
 local function generate_tag(entry, const_tags)
     local tags
     if const_tags and #const_tags > 0 then
@@ -108,127 +109,143 @@ local function generate_tag(entry, const_tags)
 end
 
 
-function _M.log(conf, ctx)
-    local entry = fetch_log(ngx, {})
-    entry.balancer_ip = ctx.balancer_ip or ""
-    entry.scheme = ctx.upstream_scheme or ""
+local function send_metric_over_udp(entry, metadata)
+    local err_msg
+    local sock = udp()
+    local host, port = metadata.value.host, metadata.value.port
 
-    -- if prefer_name is set, fetch the service/route name. If the name is 
nil, fall back to id.
-    if conf.prefer_name then
-        if entry.service_id and entry.service_id ~= "" then
-            local svc = service_fetch(entry.service_id)
+    local ok, err = sock:setpeername(host, port)
+    if not ok then
+        return false, "failed to connect to UDP server: host[" .. host
+                      .. "] port[" .. tostring(port) .. "] err: " .. err
+    end
 
-            if svc and svc.value.name ~= "" then
-                entry.service_id =  svc.value.name
-            end
-        end
+    -- Generate prefix & suffix according dogstatsd udp data format.
+    local suffix = generate_tag(entry, metadata.value.constant_tags)
+    local prefix = metadata.value.namespace
+    if prefix ~= "" then
+        prefix = prefix .. "."
+    end
 
-        if ctx.route_name and ctx.route_name ~= "" then
-            entry.route_id = ctx.route_name
-        end
+    -- request counter
+    ok, err = sock:send(format("%s:%s|%s%s", prefix .. "request.counter", 1, 
"c", suffix))
+    if not ok then
+        err_msg = "error sending request.counter: " .. err
+        core.log.error("failed to report request count to dogstatsd server: 
host[" .. host
+                       .. "] port[" .. tostring(port) .. "] err: " .. err)
     end
 
-    if batch_processor_manager:add_entry(conf, entry) then
-        return
+    -- request latency histogram
+    ok, err = sock:send(format("%s:%s|%s%s", prefix .. "request.latency",
+                               entry.latency, "h", suffix))
+    if not ok then
+        err_msg = "error sending request.latency: " .. err
+        core.log.error("failed to report request latency to dogstatsd server: 
host["
+                       .. host .. "] port[" .. tostring(port) .. "] err: " .. 
err)
     end
 
-    -- Generate a function to be executed by the batch processor
-    local func = function(entries, batch_max_size)
-        -- Fetching metadata details
-        local metadata = plugin.plugin_metadata(plugin_name)
-        if not metadata then
-            core.log.info("received nil metadata: using metadata defaults: ",
-                                core.json.delay_encode(defaults, true))
-            metadata = {}
-            metadata.value = defaults
+    -- upstream latency
+    if entry.upstream_latency then
+        ok, err = sock:send(format("%s:%s|%s%s", prefix .. "upstream.latency",
+                                   entry.upstream_latency, "h", suffix))
+        if not ok then
+            err_msg = "error sending upstream.latency: " .. err
+            core.log.error("failed to report upstream latency to dogstatsd 
server: host["
+                           .. host .. "] port[" .. tostring(port) .. "] err: " 
.. err)
         end
+    end
 
-        -- Creating a udp socket
-        local sock = udp()
-        local host, port = metadata.value.host, metadata.value.port
-        core.log.info("sending batch metrics to dogstatsd: ", host, ":", port)
+    -- apisix_latency
+    ok, err = sock:send(format("%s:%s|%s%s", prefix .. "apisix.latency",
+                               entry.apisix_latency, "h", suffix))
+    if not ok then
+        err_msg = "error sending apisix.latency: " .. err
+        core.log.error("failed to report apisix latency to dogstatsd server: 
host[" .. host
+                       .. "] port[" .. tostring(port) .. "] err: " .. err)
+    end
 
-        local ok, err = sock:setpeername(host, port)
+    -- request body size timer
+    ok, err = sock:send(format("%s:%s|%s%s", prefix .. "ingress.size",
+                               entry.request.size, "ms", suffix))
+    if not ok then
+        err_msg = "error sending ingress.size: " .. err
+        core.log.error("failed to report req body size to dogstatsd server: 
host[" .. host
+                       .. "] port[" .. tostring(port) .. "] err: " .. err)
+    end
 
-        if not ok then
-            return false, "failed to connect to UDP server: host[" .. host
-                        .. "] port[" .. tostring(port) .. "] err: " .. err
-        end
+    -- response body size timer
+    ok, err = sock:send(format("%s:%s|%s%s", prefix .. "egress.size",
+                               entry.response.size, "ms", suffix))
+    if not ok then
+        err_msg = "error sending egress.size: " .. err
+        core.log.error("failed to report response body size to dogstatsd 
server: host["
+                       .. host .. "] port[" .. tostring(port) .. "] err: " .. 
err)
+    end
 
-        -- Generate prefix & suffix according dogstatsd udp data format.
-        local prefix = metadata.value.namespace
-        if prefix ~= "" then
-            prefix = prefix .. "."
-        end
+    ok, err = sock:close()
+    if not ok then
+        core.log.error("failed to close the UDP connection, host[",
+                       host, "] port[", port, "] ", err)
+    end
 
-        core.log.info("datadog batch_entry: ", core.json.delay_encode(entries, 
true))
-        for _, entry in ipairs(entries) do
-            local suffix = generate_tag(entry, metadata.value.constant_tags)
+    if not err_msg then
+        return true
+    end
 
-            -- request counter
-            local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
-                                            "request.counter", 1, "c", suffix))
-            if not ok then
-                core.log.error("failed to report request count to dogstatsd 
server: host[" .. host
-                        .. "] port[" .. tostring(port) .. "] err: " .. err)
-            end
+    return false, err_msg
+end
 
 
-            -- request latency histogram
-            local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
-                                        "request.latency", entry.latency, "h", 
suffix))
-            if not ok then
-                core.log.error("failed to report request latency to dogstatsd 
server: host["
-                        .. host .. "] port[" .. tostring(port) .. "] err: " .. 
err)
-            end
+local function push_metrics(entries)
+    -- Fetching metadata details
+    local metadata = plugin.plugin_metadata(plugin_name)
+    core.log.info("metadata: ", core.json.delay_encode(metadata))
 
-            -- upstream latency
-            if entry.upstream_latency then
-                local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
-                                "upstream.latency", entry.upstream_latency, 
"h", suffix))
-                if not ok then
-                    core.log.error("failed to report upstream latency to 
dogstatsd server: host["
-                                .. host .. "] port[" .. tostring(port) .. "] 
err: " .. err)
-                end
-            end
+    if not metadata then
+        core.log.info("received nil metadata: using metadata defaults: ",
+                      core.json.delay_encode(defaults, true))
+        metadata = {}
+        metadata.value = defaults
+    end
+    core.log.info("sending batch metrics to dogstatsd: ", metadata.value.host,
+                  ":", metadata.value.port)
 
-            -- apisix_latency
-            local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
-                                    "apisix.latency", entry.apisix_latency, 
"h", suffix))
-            if not ok then
-                core.log.error("failed to report apisix latency to dogstatsd 
server: host[" .. host
-                        .. "] port[" .. tostring(port) .. "] err: " .. err)
-            end
+    for i = 1, #entries do
+        local ok, err = send_metric_over_udp(entries[i], metadata)
+        if not ok then
+            return false, err, i
+        end
+    end
+
+    return true
+end
 
-            -- request body size timer
-            local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
-                                            "ingress.size", 
entry.request.size, "ms", suffix))
-            if not ok then
-                core.log.error("failed to report req body size to dogstatsd 
server: host[" .. host
-                        .. "] port[" .. tostring(port) .. "] err: " .. err)
-            end
 
-            -- response body size timer
-            local ok, err = sock:send(format("%s:%s|%s%s", prefix ..
-                                            "egress.size", 
entry.response.size, "ms", suffix))
-            if not ok then
-                core.log.error("failed to report response body size to 
dogstatsd server: host["
-                        .. host .. "] port[" .. tostring(port) .. "] err: " .. 
err)
+function _M.log(conf, ctx)
+    local entry = fetch_log(ngx, {})
+    entry.balancer_ip = ctx.balancer_ip or ""
+    entry.scheme = ctx.upstream_scheme or ""
+
+    -- if prefer_name is set, fetch the service/route name. If the name is 
nil, fall back to id.
+    if conf.prefer_name then
+        if entry.service_id and entry.service_id ~= "" then
+            local svc = service_fetch(entry.service_id)
+
+            if svc and svc.value.name ~= "" then
+                entry.service_id =  svc.value.name
             end
         end
 
-        -- Releasing the UDP socket descriptor
-        ok, err = sock:close()
-        if not ok then
-            core.log.error("failed to close the UDP connection, host[",
-                            host, "] port[", port, "] ", err)
+        if ctx.route_name and ctx.route_name ~= "" then
+            entry.route_id = ctx.route_name
         end
+    end
 
-        -- Returning at the end and ensuring the resource has been released.
-        return true
+    if batch_processor_manager:add_entry(conf, entry) then
+        return
     end
 
-    batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
+    batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, 
push_metrics)
 end
 
 return _M
diff --git a/apisix/plugins/loggly.lua b/apisix/plugins/loggly.lua
index 5bd35fe..d182888 100644
--- a/apisix/plugins/loggly.lua
+++ b/apisix/plugins/loggly.lua
@@ -292,7 +292,7 @@ local function handle_log(entries)
         for i = 1, #entries do
             local ok, err = send_data_over_udp(entries[i], metadata)
             if not ok then
-                return false, err
+                return false, err, i
             end
         end
     else
diff --git a/apisix/utils/batch-processor.lua b/apisix/utils/batch-processor.lua
index 8ba4813..dcd3d59 100644
--- a/apisix/utils/batch-processor.lua
+++ b/apisix/utils/batch-processor.lua
@@ -63,17 +63,38 @@ local function set_metrics(self, count)
 end
 
 
+local function slice_batch(batch, n)
+    local slice = {}
+    local idx = 1
+    for i = n or 1, #batch do
+        slice[idx] = batch[i]
+        idx = idx + 1
+    end
+    return slice
+end
+
+
 function execute_func(premature, self, batch)
     if premature then
         return
     end
 
-    local ok, err = self.func(batch.entries, self.batch_max_size)
+    -- In case of "err" and a valid "first_fail" batch processor considers, 
all first_fail-1
+    -- entries have been successfully consumed and hence reschedule the job 
for entries with
+    -- index first_fail to #entries based on the current retry policy.
+    local ok, err, first_fail = self.func(batch.entries, self.batch_max_size)
     if not ok then
-        core.log.error("Batch Processor[", self.name,
-                       "] failed to process entries: ", err)
+        if first_fail then
+            core.log.error("Batch Processor[", self.name, "] failed to process 
entries [",
+                            #batch.entries + 1 - first_fail, "/", 
#batch.entries ,"]: ", err)
+            batch.entries = slice_batch(batch.entries, first_fail)
+        else
+            core.log.error("Batch Processor[", self.name,
+                           "] failed to process entries: ", err)
+        end
+
         batch.retry_count = batch.retry_count + 1
-        if batch.retry_count <= self.max_retry_count then
+        if batch.retry_count <= self.max_retry_count and #batch.entries > 0 
then
             schedule_func_exec(self, self.retry_delay,
                                batch)
         else
diff --git a/docs/en/latest/batch-processor.md 
b/docs/en/latest/batch-processor.md
index 08ec5dc..6f18cf3 100644
--- a/docs/en/latest/batch-processor.md
+++ b/docs/en/latest/batch-processor.md
@@ -70,7 +70,10 @@ function _M.log(conf, ctx)
         -- serialize to json array core.json.encode(entries)
         -- process/send data
         return true
-        -- return false, err_msg if failed
+        -- return false, err_msg, first_fail if failed
+        -- first_fail(optional) indicates first_fail-1 entries have been 
successfully processed
+        -- and during processing of entries[first_fail], the error occurred. 
So the batch processor
+        -- only retries for the entries having index >= first_fail as per the 
retry policy.
     end
     batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
 end
@@ -120,7 +123,7 @@ local err
 local func = function(entries)
     ...
     return true
-    -- return false, err_msg if failed
+    -- return false, err_msg, first_fail if failed
 end
 log_buffer, err = batch_processor:new(func, config_bat)
 
diff --git a/docs/zh/latest/batch-processor.md 
b/docs/zh/latest/batch-processor.md
index 78f755c..3096a1b 100644
--- a/docs/zh/latest/batch-processor.md
+++ b/docs/zh/latest/batch-processor.md
@@ -68,7 +68,10 @@ function _M.log(conf, ctx)
         -- serialize to json array core.json.encode(entries)
         -- process/send data
         return true
-        -- return false, err_msg if failed
+        -- return false, err_msg, first_fail if failed
+        -- first_fail(optional) indicates first_fail-1 entries have been 
successfully processed
+        -- and during processing of entries[first_fail], the error occurred. 
So the batch processor
+        -- only retries for the entries having index >= first_fail as per the 
retry policy.
     end
     batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
 end
@@ -118,7 +121,7 @@ local err
 local func = function(entries)
     ...
     return true
-    -- return false, err_msg if failed
+    -- return false, err_msg, first_fail if failed
 end
 log_buffer, err = batch_processor:new(func, config_bat)
 
diff --git a/t/utils/batch-processor.t b/t/utils/batch-processor.t
index 15cf5cc..e1ce83b 100644
--- a/t/utils/batch-processor.t
+++ b/t/utils/batch-processor.t
@@ -438,3 +438,46 @@ Batch Processor[log buffer] activating flush due to no 
activity
 --- error_log
 Batch Processor[log buffer] extending buffer timer
 --- wait: 3
+
+
+
+=== TEST 12: partially consumed entries
+--- config
+    location /t {
+        content_by_lua_block {
+            local Batch = require("apisix.utils.batch-processor")
+            local core = require("apisix.core")
+            local config = {
+                max_retry_count  = 1,
+                batch_max_size = 3,
+                retry_delay  = 0,
+                inactive_timeout = 1
+            }
+            local func_to_send = function(elements)
+                core.log.info(require("toolkit.json").encode(elements))
+                return false, "error after consuming single entry", 2
+            end
+            local log_buffer, err = Batch:new(func_to_send, config)
+
+            if not log_buffer then
+                ngx.say(err)
+            end
+
+            log_buffer:push({msg='1'})
+            log_buffer:push({msg='2'})
+            log_buffer:push({msg='3'})
+            log_buffer:push({msg='4'})
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+[{"msg":"1"},{"msg":"2"},{"msg":"3"}]
+Batch Processor[log buffer] failed to process entries [2/3]: error after 
consuming single entry
+[{"msg":"2"},{"msg":"3"}]
+Batch Processor[log buffer] failed to process entries [1/2]: error after 
consuming single entry
+[{"msg":"4"}]
+--- wait: 2

Reply via email to