This is an automated email from the ASF dual-hosted git repository. wenming pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-apisix.git
The following commit(s) were added to refs/heads/master by this push: new 52934b5 Batch Processor: Fix for rescheduling execution for max_retry_count is 0 and 1 (#1349) 52934b5 is described below commit 52934b5fc78e4628ccdac35670438864fce90396 Author: Nirojan Selvanathan <sshn...@gmail.com> AuthorDate: Mon Mar 30 03:44:33 2020 +0200 Batch Processor: Fix for rescheduling execution for max_retry_count is 0 and 1 (#1349) --- lua/apisix/utils/batch-processor.lua | 27 ++++++++++++--------------- t/utils/batch-processor.t | 27 +++++++++++++-------------- 2 files changed, 25 insertions(+), 29 deletions(-) diff --git a/lua/apisix/utils/batch-processor.lua b/lua/apisix/utils/batch-processor.lua index 4fcbe6e..4196095 100644 --- a/lua/apisix/utils/batch-processor.lua +++ b/lua/apisix/utils/batch-processor.lua @@ -17,7 +17,6 @@ local core = require("apisix.core") local setmetatable = setmetatable local timer_at = ngx.timer.at -local fmt = string.format local ipairs = ipairs local table = table local now = ngx.now @@ -57,22 +56,20 @@ function execute_func(premature, batch_processor, batch) return end - local ok, err = batch_processor.func(batch.entries) + local ok, err = batch_processor.func(batch.entries, batch_processor.batch_max_size) if not ok then + core.log.error("Batch Processor[", batch_processor.name, "] failed to process entries: ", err) batch.retry_count = batch.retry_count + 1 - if batch.retry_count < batch_processor.max_retry_count then - core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", - batch_processor.name), err) + if batch.retry_count <= batch_processor.max_retry_count then schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) else - core.log.error(fmt(("Batch Processor[%s] exceeded the max_retry_count[%d] " - .. "dropping the entries"), batch_processor.name, batch.retry_count)) + core.log.error("Batch Processor[", batch_processor.name,"] exceeded ", + "the max_retry_count[", batch.retry_count,"] dropping the entries") end return end - core.log.debug(fmt("Batch Processor[%s] successfully processed the entries", - batch_processor.name)) + core.log.debug("Batch Processor[", batch_processor.name ,"] successfully processed the entries") end @@ -83,15 +80,15 @@ local function flush_buffer(premature, batch_processor) if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then - core.log.debug(fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", - batch_processor.name)) + core.log.debug("Batch Processor[", batch_processor.name ,"] buffer ", + "duration exceeded, activating buffer flush") batch_processor:process_buffer() batch_processor.is_timer_running = false return end -- buffer duration did not exceed or the buffer is active, extending the timer - core.log.debug(fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name)) + core.log.debug("Batch Processor[", batch_processor.name ,"] extending buffer timer") create_buffer_timer(batch_processor) end @@ -152,7 +149,7 @@ function Batch_Processor:push(entry) self.last_entry_t = now() if self.batch_max_size <= #entries then - core.log.debug(fmt("batch processor[%s] batch max size has exceeded", self.name)) + core.log.debug("Batch Processor[", self.name ,"] batch max size has exceeded") self:process_buffer() end @@ -165,8 +162,8 @@ end function Batch_Processor:process_buffer() -- If entries are present in the buffer move the entries to processing if #self.entry_buffer.entries > 0 then - core.log.debug(fmt("tranferring buffer entries to processing pipe line, buffercount[%d]", - #self.entry_buffer.entries)) + core.log.debug("tranferring buffer entries to processing pipe line, ", + "buffercount[", #self.entry_buffer.entries ,"]") self.batch_to_process[#self.batch_to_process + 1] = self.entry_buffer self.entry_buffer = { entries = {}, retry_count = 0 } end diff --git a/t/utils/batch-processor.t b/t/utils/batch-processor.t index 4b3a569..2e930d1 100644 --- a/t/utils/batch-processor.t +++ b/t/utils/batch-processor.t @@ -122,7 +122,7 @@ GET /t --- response_body done --- error_log -BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush +Batch Processor[log buffer] buffer duration exceeded, activating buffer flush Batch Processor[log buffer] successfully processed the entries --- wait: 3 @@ -134,9 +134,9 @@ Batch Processor[log buffer] successfully processed the entries content_by_lua_block { local Batch = require("apisix.utils.batch-processor") local config = { - max_retry_count = 2, + max_retry_count = 2, batch_max_size = 2, - retry_delay = 0, + retry_delay = 0, } local func_to_send = function(elements) return true @@ -157,11 +157,11 @@ GET /t --- response_body done --- no_error_log -BatchProcessor[log buffer] activating flush due to no activity +Batch Processor[log buffer] buffer duration exceeded, activating buffer flush --- error_log -batch processor[log buffer] batch max size has exceeded +Batch Processor[log buffer] batch max size has exceeded Batch Processor[log buffer] successfully processed the entries ---- wait: 0.5 +--- wait: 1 @@ -235,7 +235,7 @@ GET /t --- response_body done --- no_error_log -BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush +Batch Processor[log buffer] buffer duration exceeded, activating buffer flush --- error_log Batch Processor[log buffer] failed to process entries Batch Processor[log buffer] exceeded the max_retry_count @@ -278,7 +278,7 @@ GET /t --- response_body done --- no_error_log -BatchProcessor[log buffer] activating flush due to no activity +Batch Processor[log buffer] activating flush due to no activity --- error_log batch[1] sent batch[2] sent @@ -315,8 +315,7 @@ GET /t --- response_body done --- no_error_log -BatchProcessor[log buffer] activating flush due to no activity -Batch Processor[log buffer] failed to process entries +Batch Processor[log buffer] activating flush due to no activity --- error_log Batch Processor[log buffer] exceeded the max_retry_count --- wait: 0.5 @@ -353,7 +352,7 @@ GET /t --- response_body done --- error_log -BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush +Batch Processor[log buffer] buffer duration exceeded, activating buffer flush Batch Processor[log buffer] successfully processed the entries --- wait: 3 @@ -392,7 +391,7 @@ GET /t --- response_body done --- no_error_log -BatchProcessor[log buffer] activating flush due to no activity +Batch Processor[log buffer] activating flush due to no activity --- error_log [{"msg":"1"},{"msg":"2"}] [{"msg":"3"},{"msg":"4"}] @@ -435,7 +434,7 @@ GET /t --- response_body done --- no_error_log -BatchProcessor[log buffer] activating flush due to no activity +Batch Processor[log buffer] activating flush due to no activity --- error_log -BatchProcessor[log buffer] extending buffer timer +Batch Processor[log buffer] extending buffer timer --- wait: 3