[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380403734 ## File path: t/plugin/batch-processor.t ## @@ -0,0 +1,367 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +log_level('debug'); +repeat_each(1); +no_long_string(); +no_root_location(); +run_tests; + +__DATA__ + +=== TEST 1: send invalid arguments for constructor +--- config +location /t { +content_by_lua_block { +local Batch = require("apisix.plugins.batch-processor") +local config = { +max_retry_count = 2, +batch_max_size = 1, +process_delay = 0, +retry_delay = 0, +} +local func_to_send = function(elements) +return true +end +local log_buffer, err = Batch:new("", config) + +if log_buffer then +log_buffer:push({hello='world'}) +ngx.say("done") +end + +if not log_buffer then +ngx.say("failed") +end + +} +} +--- request +GET /t +--- response_body +failed +--- wait: 0.5 + + + +=== TEST 2: sanity +--- config +location /t { +content_by_lua_block { +local Batch = require("apisix.plugins.batch-processor") +local func_to_send = function(elements) +return true +end + +local config = { +max_retry_count = 2, +batch_max_size = 1, +process_delay = 0, +retry_delay = 0, +} + +local log_buffer, err = Batch:new(func_to_send, config) + +if not log_buffer then +ngx.say(err) +end + +log_buffer:push({hello='world'}) +ngx.say("done") +} +} +--- request +GET /t +--- response_body +done +--- error_log +Batch Processor[log buffer] successfully processed the entries +--- wait: 0.5 + + + +=== TEST 3: batch processor timeout exceeded +--- config +location /t { +content_by_lua_block { +local Batch = require("apisix.plugins.batch-processor") +local config = { +max_retry_count = 2, +batch_max_size = 2, +process_delay = 0, +retry_delay = 0, +inactive_timeout = 1 +} +local func_to_send = function(elements) +return true +end +local log_buffer, err = Batch:new(func_to_send, config) + +if not log_buffer then +ngx.say(err) +end + +log_buffer:push({hello='world'}) +ngx.say("done") +} +} +--- request +GET /t +--- response_body +done +--- error_log +BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush +Batch Processor[log buffer] successfully processed the entries +--- wait: 3 + + + +=== TEST 4: batch processor batch max size exceeded +--- config +location /t { +content_by_lua_block { +local Batch = require("apisix.plugins.batch-processor") +local config = { +max_retry_count = 2, +batch_max_size = 2, +process_delay = 0, +retry_delay = 0, +} +local func_to_send = function(elements) +return true +end +local log_buffer, err = Batch:new(func_to_send, config) + +if not log_buffer then +ngx.say(err) +end + +log_buffer:push({hello='world'}) +log_buffer:push({hello='world'}) +ngx.say("done") +} +} +--- request +GET /t +--- response_body +done +--- no_error_log +BatchProcessor[log buffer] activating flush due to no activity +--- error_log +batch processor[log buffer] batch max size has exceeded +Batch Processor[log buffer] successfully processed the entries +--- wait: 0.5 + + + +=== TEST 5: first failed to process and second try
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380403114 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,181 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local ipairs = ipairs +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +function execute_func(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if not ok then +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", +batch_processor.name, batch.retry_count)) +end +return +end + +core.log.debug(fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) +end + + +local function flush_buffer(premature, batch_processor) +if premature then +return +end + +if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or +now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then +core.log.debug(fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", +batch_processor.name)) +batch_processor:process_buffer() +batch_processor.isTimerRunning = false +return +end + +-- buffer duration did not exceed or the buffer is active, extending the timer +ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name)) +create_buffer_timer(batch_processor) +end + + +function create_buffer_timer(batch_processor) +local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor) +if not hdl then +core.log.error("failed to create buffer timer: ", err) +return +end +batch_processor.isTimerRunning = true +end + + +function Batch_Processor:new(func, config) +local ok, err = core.schema.check(schema, config) + +if not ok then +return err +end + +if not(type(func) == "function") then +return nil, "Invalid argument, arg #1 must be a function" +end + +local batch_processor = { +func = func, +buffer_duration = config.buffer_duration, +inactive_timeout = config.inactive_timeout, +max_retry_count = config.max_retry_count, +batch_max_size = config.batch_max_size, +retry_delay =
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380402546 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,179 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then +ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) + +else +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count)) +end +end +end + + +local function flush_buffer(premature, batch_processor) + +if premature then +return +end + +if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or +now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then +ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name)) +batch_processor:process_buffer() +batch_processor.isTimerRunning = false +return +end + +-- buffer duration did not exceed or the buffer is active, extending the timer +ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name)) Review comment: any news? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379945467 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,179 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then +ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) + +else +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count)) +end +end +end + + +local function flush_buffer(premature, batch_processor) + +if premature then +return +end + +if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or +now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then +ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name)) +batch_processor:process_buffer() +batch_processor.isTimerRunning = false +return +end + +-- buffer duration did not exceed or the buffer is active, extending the timer +ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name)) +create_buffer_timer(batch_processor) +end + + +create_buffer_timer = function(batch_processor) Review comment: same problem with function `execute_func` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379944773 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,179 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then +ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) + +else +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count)) +end +end +end + + +local function flush_buffer(premature, batch_processor) + +if premature then +return +end + +if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or +now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then +ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name)) +batch_processor:process_buffer() +batch_processor.isTimerRunning = false +return +end + +-- buffer duration did not exceed or the buffer is active, extending the timer +ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name)) +create_buffer_timer(batch_processor) +end + + +create_buffer_timer = function(batch_processor) Review comment: good style for this case: `function create_buffer_timer(batch_processor)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379944685 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,179 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then +ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) + +else +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count)) +end +end +end + + +local function flush_buffer(premature, batch_processor) + +if premature then +return +end + +if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or +now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then +ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name)) +batch_processor:process_buffer() +batch_processor.isTimerRunning = false +return +end + +-- buffer duration did not exceed or the buffer is active, extending the timer +ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name)) Review comment: code style: `code.log.debug(...)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379945418 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,180 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then Review comment: please take a look at this new style: ```lua local ok , err = ... if not ok then batch.retry_count = batch.retry_count + 1 ... end core.log.debug(...) return This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379944560 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,180 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then +ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) + +else +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count)) +end +end +end + + +local function flush_buffer(premature, batch_processor) +if premature then +return +end + +if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or +now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then +core.log.debug(fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name)) Review comment: code style: this line is too long The single line of code should be less than 80 characters, and please fix other similar points. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379945034 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,179 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then +ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) + +else +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count)) +end +end +end + + +local function flush_buffer(premature, batch_processor) + +if premature then +return +end + +if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or +now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then +ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name)) +batch_processor:process_buffer() +batch_processor.isTimerRunning = false +return +end + +-- buffer duration did not exceed or the buffer is active, extending the timer +ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name)) +create_buffer_timer(batch_processor) +end + + +create_buffer_timer = function(batch_processor) +local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor) +if not hdl then +core.log.error("failed to create buffer timer: ", err) +return +end +batch_processor.isTimerRunning = true +end + + +function Batch_Processor:new(func, config) +local ok, err = core.schema.check(schema, config) + +if not ok then +return err +end + +if not(type(func) == "function") then +return nil, "Invalid argument, arg #1 must be a function" +end + +local batch_processor = { +func = func, +buffer_duration = config.buffer_duration, +inactive_timeout = config.inactive_timeout, +max_retry_count = config.max_retry_count, +batch_max_size = config.batch_max_size, +retry_delay = config.retry_delay, +name
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379900594 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,179 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then +ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) + +else +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count)) +end +end +end + + +local function flush_buffer(premature, batch_processor) + +if premature then +return +end + +if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or +now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then +ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name)) Review comment: @sshniro the #1124 has been merged, you can continue your job now ^_^ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379231789 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,179 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then +ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) + +else +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count)) +end +end +end + + +local function flush_buffer(premature, batch_processor) + +if premature then +return +end + +if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or +now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then +ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name)) Review comment: I have submitted a PR to fix it: https://github.com/apache/incubator-apisix/pull/1124 You can rebase your branch after https://github.com/apache/incubator-apisix/pull/1124 merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378772964 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,179 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then +ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) + +else +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count)) +end +end +end + + +local function flush_buffer(premature, batch_processor) + +if premature then +return +end + +if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or +now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then +ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name)) Review comment: that is a bug, I will submit a new PR to fix this bug later This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378637883 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,179 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then +ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) + +else +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count)) +end +end +end + + +local function flush_buffer(premature, batch_processor) + Review comment: please remove this useless blank line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378644301 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,179 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then +ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) + +else +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count)) +end +end +end + + +local function flush_buffer(premature, batch_processor) + +if premature then +return +end + +if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or +now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then +ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name)) +batch_processor:process_buffer() +batch_processor.isTimerRunning = false +return +end + +-- buffer duration did not exceed or the buffer is active, extending the timer +ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name)) +create_buffer_timer(batch_processor) +end + + +create_buffer_timer = function(batch_processor) +local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor) +if not hdl then +core.log.error("failed to create buffer timer: ", err) +return +end +batch_processor.isTimerRunning = true +end + + +function Batch_Processor:new(func, config) +local ok, err = core.schema.check(schema, config) + +if not ok then +return err +end + +if not(type(func) == "function") then +return nil, "Invalid argument, arg #1 must be a function" +end + +local batch_processor = { +func = func, +buffer_duration = config.buffer_duration, +inactive_timeout = config.inactive_timeout, +max_retry_count = config.max_retry_count, +batch_max_size = config.batch_max_size, +retry_delay = config.retry_delay, +name
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378637750 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,179 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then +ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) + +else +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count)) +end +end +end + + +local function flush_buffer(premature, batch_processor) + +if premature then +return +end + +if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or +now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then +ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name)) Review comment: change to `core.log.debug(...)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378637838 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,179 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then +ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) + +else +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count)) +end +end +end + + +local function flush_buffer(premature, batch_processor) + +if premature then +return +end + +if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or +now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then +ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name)) +batch_processor:process_buffer() +batch_processor.isTimerRunning = false +return +end + +-- buffer duration did not exceed or the buffer is active, extending the timer +ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name)) Review comment: ditto This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378643680 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,179 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then +ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) + +else +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count)) +end +end +end + + +local function flush_buffer(premature, batch_processor) + +if premature then +return +end + +if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or +now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then +ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name)) +batch_processor:process_buffer() +batch_processor.isTimerRunning = false +return +end + +-- buffer duration did not exceed or the buffer is active, extending the timer +ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name)) +create_buffer_timer(batch_processor) +end + + +create_buffer_timer = function(batch_processor) +local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor) +if not hdl then +core.log.error("failed to create buffer timer: ", err) +return +end +batch_processor.isTimerRunning = true +end + + +function Batch_Processor:new(func, config) +local ok, err = core.schema.check(schema, config) + +if not ok then +return err +end + +if not(type(func) == "function") then +return nil, "Invalid argument, arg #1 must be a function" +end + +local batch_processor = { +func = func, +buffer_duration = config.buffer_duration, +inactive_timeout = config.inactive_timeout, +max_retry_count = config.max_retry_count, +batch_max_size = config.batch_max_size, +retry_delay = config.retry_delay, +name
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378638012 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,179 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then +ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) + +else +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count)) +end +end +end + + +local function flush_buffer(premature, batch_processor) + +if premature then +return +end + +if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or +now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then +ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name)) +batch_processor:process_buffer() +batch_processor.isTimerRunning = false +return +end + +-- buffer duration did not exceed or the buffer is active, extending the timer +ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name)) +create_buffer_timer(batch_processor) +end + + +create_buffer_timer = function(batch_processor) Review comment: code style: `local function create_buffer_timer()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378643354 ## File path: lua/apisix/plugins/batch-processor.lua ## @@ -0,0 +1,179 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local setmetatable = setmetatable +local timer_at = ngx.timer.at +local remove = table.remove +local fmt = string.format +local ngx_log = ngx.log +local DEBUG = ngx.DEBUG +local table = table +local now = ngx.now +local type = type +local Batch_Processor = {} +local Batch_Processor_mt = { +__index = Batch_Processor +} +local execute_func +local create_buffer_timer + + +local schema = { +type = "object", +properties = { +name = {type = "string", default = "log buffer"}, +max_retry_count = {type = "integer", minimum = 0, default= 0}, +retry_delay = {type = "integer", minimum = 0, default= 1}, +buffer_duration = {type = "integer", minimum = 1, default= 60}, -- maximum age in seconds of the oldest log item in a batch before the batch must be transmitted +inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- maximum age in seconds when the buffer will be flushed if inactive +batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- maximum number of entries in a batch before the batch must be transmitted +} +} + + +local function schedule_func_exec(batch_processor, delay, batch) +local hdl, err = timer_at(delay, execute_func, batch_processor, batch) +if not hdl then +core.log.error("failed to create process timer: ", err) +return +end +end + + +execute_func = function(premature, batch_processor, batch) +if premature then +return +end + +local ok, err = batch_processor.func(batch.entries) +if ok then +ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the entries", batch_processor.name)) + +else +batch.retry_count = batch.retry_count + 1 +if batch.retry_count < batch_processor.max_retry_count then +core.log.warn(fmt("Batch Processor[%s] failed to process entries: ", batch_processor.name), err) +schedule_func_exec(batch_processor, batch_processor.retry_delay, batch) +else +core.log.error(fmt("Batch Processor[%s] exceeded the max_retry_count[%d], dropping the entries", batch_processor.name, batch.retry_count)) +end +end +end + + +local function flush_buffer(premature, batch_processor) + +if premature then +return +end + +if now() - batch_processor.last_entry_t >= batch_processor.inactive_timeout or +now() - batch_processor.first_entry_t >= batch_processor.buffer_duration then +ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, activating buffer flush", batch_processor.name)) +batch_processor:process_buffer() +batch_processor.isTimerRunning = false +return +end + +-- buffer duration did not exceed or the buffer is active, extending the timer +ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", batch_processor.name)) +create_buffer_timer(batch_processor) +end + + +create_buffer_timer = function(batch_processor) +local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, batch_processor) +if not hdl then +core.log.error("failed to create buffer timer: ", err) +return +end +batch_processor.isTimerRunning = true +end + + +function Batch_Processor:new(func, config) +local ok, err = core.schema.check(schema, config) + +if not ok then +return err +end + +if not(type(func) == "function") then +return nil, "Invalid argument, arg #1 must be a function" +end + +local batch_processor = { +func = func, +buffer_duration = config.buffer_duration, +inactive_timeout = config.inactive_timeout, +max_retry_count = config.max_retry_count, +batch_max_size = config.batch_max_size, +retry_delay = config.retry_delay, +name
[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch
membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378644474 ## File path: t/plugin/batch-processor-refac.t ## @@ -0,0 +1,374 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +log_level('debug'); +repeat_each(1); +no_long_string(); +no_root_location(); +run_tests; + +__DATA__ + +=== TEST 1: wrong configuration parameters +--- config +location /t { +content_by_lua_block { +local Batch = require("apisix.plugins.batch-processor") +local func_to_send = function(elements) +return true +end + +local config = { +max_retry_count = 2, +batch_max_size = 1, +process_delay = 0, +retry_delay = 0, +} + +local log_buffer, err = Batch:new("", config) + +if log_buffer then +log_buffer:push({hello='world'}) +ngx.say("done") +end + +if not log_buffer then +ngx.say("failed") +end + +} +} +--- request +GET /t +--- response_body +failed +--- wait: 0.5 + + +=== TEST 2: sanity +--- config +location /t { +content_by_lua_block { +local Batch = require("apisix.plugins.batch-processor") +local func_to_send = function(elements) +return true +end + +local config = { +max_retry_count = 2, +batch_max_size = 1, +process_delay = 0, +retry_delay = 0, +} + +local log_buffer, err = Batch:new(func_to_send, config) + +if not log_buffer then +ngx.say(err) +end + +log_buffer:push({hello='world'}) +ngx.say("done") +} +} +--- request +GET /t +--- response_body +done +--- error_log +Batch Processor[log buffer] successfully processed the entries +--- wait: 0.5 + +=== TEST 3: batch processor timeout exceeded Review comment: https://github.com/apache/incubator-apisix/blob/fda20d99d55d91905622b9d780e4dce79d128e76/Contributing.md#check-code-style-and-test-case-style check your code style and test case style This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services