This is an automated email from the ASF dual-hosted git repository.

wenming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 52934b5  Batch Processor: Fix for rescheduling execution for 
max_retry_count is 0 and 1 (#1349)
52934b5 is described below

commit 52934b5fc78e4628ccdac35670438864fce90396
Author: Nirojan Selvanathan <sshn...@gmail.com>
AuthorDate: Mon Mar 30 03:44:33 2020 +0200

    Batch Processor: Fix for rescheduling execution for max_retry_count is 0 
and 1 (#1349)
---
 lua/apisix/utils/batch-processor.lua | 27 ++++++++++++---------------
 t/utils/batch-processor.t            | 27 +++++++++++++--------------
 2 files changed, 25 insertions(+), 29 deletions(-)

diff --git a/lua/apisix/utils/batch-processor.lua 
b/lua/apisix/utils/batch-processor.lua
index 4fcbe6e..4196095 100644
--- a/lua/apisix/utils/batch-processor.lua
+++ b/lua/apisix/utils/batch-processor.lua
@@ -17,7 +17,6 @@
 local core = require("apisix.core")
 local setmetatable = setmetatable
 local timer_at = ngx.timer.at
-local fmt = string.format
 local ipairs = ipairs
 local table = table
 local now = ngx.now
@@ -57,22 +56,20 @@ function execute_func(premature, batch_processor, batch)
         return
     end
 
-    local ok, err = batch_processor.func(batch.entries)
+    local ok, err = batch_processor.func(batch.entries, 
batch_processor.batch_max_size)
     if not ok then
+        core.log.error("Batch Processor[", batch_processor.name, "] failed to 
process entries: ", err)
         batch.retry_count = batch.retry_count + 1
-        if batch.retry_count < batch_processor.max_retry_count then
-            core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
",
-                batch_processor.name), err)
+        if batch.retry_count <= batch_processor.max_retry_count then
             schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
         else
-            core.log.error(fmt(("Batch Processor[%s] exceeded the 
max_retry_count[%d] "
-                    .. "dropping the entries"), batch_processor.name, 
batch.retry_count))
+            core.log.error("Batch Processor[", batch_processor.name,"] 
exceeded ",
+                "the max_retry_count[", batch.retry_count,"] dropping the 
entries")
         end
         return
     end
 
-    core.log.debug(fmt("Batch Processor[%s] successfully processed the 
entries",
-        batch_processor.name))
+    core.log.debug("Batch Processor[", batch_processor.name ,"] successfully 
processed the entries")
 end
 
 
@@ -83,15 +80,15 @@ local function flush_buffer(premature, batch_processor)
 
     if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
             now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
-        core.log.debug(fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush",
-            batch_processor.name))
+        core.log.debug("Batch Processor[", batch_processor.name ,"] buffer ",
+            "duration exceeded, activating buffer flush")
         batch_processor:process_buffer()
         batch_processor.is_timer_running = false
         return
     end
 
     -- buffer duration did not exceed or the buffer is active, extending the 
timer
-    core.log.debug(fmt("BatchProcessor[%s] extending buffer timer", 
batch_processor.name))
+    core.log.debug("Batch Processor[", batch_processor.name ,"] extending 
buffer timer")
     create_buffer_timer(batch_processor)
 end
 
@@ -152,7 +149,7 @@ function Batch_Processor:push(entry)
     self.last_entry_t = now()
 
     if self.batch_max_size <= #entries then
-        core.log.debug(fmt("batch processor[%s] batch max size has exceeded", 
self.name))
+        core.log.debug("Batch Processor[", self.name ,"] batch max size has 
exceeded")
         self:process_buffer()
     end
 
@@ -165,8 +162,8 @@ end
 function Batch_Processor:process_buffer()
     -- If entries are present in the buffer move the entries to processing
     if #self.entry_buffer.entries > 0 then
-        core.log.debug(fmt("tranferring buffer entries to processing pipe 
line, buffercount[%d]",
-            #self.entry_buffer.entries))
+        core.log.debug("tranferring buffer entries to processing pipe line, ",
+            "buffercount[", #self.entry_buffer.entries ,"]")
         self.batch_to_process[#self.batch_to_process + 1] = self.entry_buffer
         self.entry_buffer = { entries = {}, retry_count = 0 }
     end
diff --git a/t/utils/batch-processor.t b/t/utils/batch-processor.t
index 4b3a569..2e930d1 100644
--- a/t/utils/batch-processor.t
+++ b/t/utils/batch-processor.t
@@ -122,7 +122,7 @@ GET /t
 --- response_body
 done
 --- error_log
-BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
+Batch Processor[log buffer] buffer duration exceeded, activating buffer flush
 Batch Processor[log buffer] successfully processed the entries
 --- wait: 3
 
@@ -134,9 +134,9 @@ Batch Processor[log buffer] successfully processed the 
entries
         content_by_lua_block {
             local Batch = require("apisix.utils.batch-processor")
             local config = {
-                max_retry_count  = 2,
+                max_retry_count = 2,
                 batch_max_size = 2,
-                retry_delay  = 0,
+                retry_delay = 0,
             }
             local func_to_send = function(elements)
                 return true
@@ -157,11 +157,11 @@ GET /t
 --- response_body
 done
 --- no_error_log
-BatchProcessor[log buffer] activating flush due to no activity
+Batch Processor[log buffer] buffer duration exceeded, activating buffer flush
 --- error_log
-batch processor[log buffer] batch max size has exceeded
+Batch Processor[log buffer] batch max size has exceeded
 Batch Processor[log buffer] successfully processed the entries
---- wait: 0.5
+--- wait: 1
 
 
 
@@ -235,7 +235,7 @@ GET /t
 --- response_body
 done
 --- no_error_log
-BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
+Batch Processor[log buffer] buffer duration exceeded, activating buffer flush
 --- error_log
 Batch Processor[log buffer] failed to process entries
 Batch Processor[log buffer] exceeded the max_retry_count
@@ -278,7 +278,7 @@ GET /t
 --- response_body
 done
 --- no_error_log
-BatchProcessor[log buffer] activating flush due to no activity
+Batch Processor[log buffer] activating flush due to no activity
 --- error_log
 batch[1] sent
 batch[2] sent
@@ -315,8 +315,7 @@ GET /t
 --- response_body
 done
 --- no_error_log
-BatchProcessor[log buffer] activating flush due to no activity
-Batch Processor[log buffer] failed to process entries
+Batch Processor[log buffer] activating flush due to no activity
 --- error_log
 Batch Processor[log buffer] exceeded the max_retry_count
 --- wait: 0.5
@@ -353,7 +352,7 @@ GET /t
 --- response_body
 done
 --- error_log
-BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
+Batch Processor[log buffer] buffer duration exceeded, activating buffer flush
 Batch Processor[log buffer] successfully processed the entries
 --- wait: 3
 
@@ -392,7 +391,7 @@ GET /t
 --- response_body
 done
 --- no_error_log
-BatchProcessor[log buffer] activating flush due to no activity
+Batch Processor[log buffer] activating flush due to no activity
 --- error_log
 [{"msg":"1"},{"msg":"2"}]
 [{"msg":"3"},{"msg":"4"}]
@@ -435,7 +434,7 @@ GET /t
 --- response_body
 done
 --- no_error_log
-BatchProcessor[log buffer] activating flush due to no activity
+Batch Processor[log buffer] activating flush due to no activity
 --- error_log
-BatchProcessor[log buffer] extending buffer timer
+Batch Processor[log buffer] extending buffer timer
 --- wait: 3

Reply via email to