[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-17 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380403734
 
 

 ##
 File path: t/plugin/batch-processor.t
 ##
 @@ -0,0 +1,367 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+run_tests;
+
+__DATA__
+
+=== TEST 1: send invalid arguments for constructor
+--- config
+location /t {
+content_by_lua_block {
+local Batch = require("apisix.plugins.batch-processor")
+local config = {
+max_retry_count  = 2,
+batch_max_size = 1,
+process_delay  = 0,
+retry_delay  = 0,
+}
+local func_to_send = function(elements)
+return true
+end
+local log_buffer, err = Batch:new("", config)
+
+if log_buffer then
+log_buffer:push({hello='world'})
+ngx.say("done")
+end
+
+if not log_buffer then
+ngx.say("failed")
+end
+
+}
+}
+--- request
+GET /t
+--- response_body
+failed
+--- wait: 0.5
+
+
+
+=== TEST 2: sanity
+--- config
+location /t {
+content_by_lua_block {
+local Batch = require("apisix.plugins.batch-processor")
+local func_to_send = function(elements)
+return true
+end
+
+local config = {
+max_retry_count  = 2,
+batch_max_size = 1,
+process_delay  = 0,
+retry_delay  = 0,
+}
+
+local log_buffer, err = Batch:new(func_to_send, config)
+
+if not log_buffer then
+ngx.say(err)
+end
+
+log_buffer:push({hello='world'})
+ngx.say("done")
+}
+}
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 3: batch processor timeout exceeded
+--- config
+location /t {
+content_by_lua_block {
+local Batch = require("apisix.plugins.batch-processor")
+local config = {
+max_retry_count  = 2,
+batch_max_size = 2,
+process_delay  = 0,
+retry_delay  = 0,
+inactive_timeout = 1
+}
+local func_to_send = function(elements)
+return true
+end
+local log_buffer, err = Batch:new(func_to_send, config)
+
+if not log_buffer then
+ngx.say(err)
+end
+
+log_buffer:push({hello='world'})
+ngx.say("done")
+}
+}
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+BatchProcessor[log buffer] buffer duration exceeded, activating buffer flush
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 3
+
+
+
+=== TEST 4: batch processor batch max size exceeded
+--- config
+location /t {
+content_by_lua_block {
+local Batch = require("apisix.plugins.batch-processor")
+local config = {
+max_retry_count  = 2,
+batch_max_size = 2,
+process_delay  = 0,
+retry_delay  = 0,
+}
+local func_to_send = function(elements)
+return true
+end
+local log_buffer, err = Batch:new(func_to_send, config)
+
+if not log_buffer then
+ngx.say(err)
+end
+
+log_buffer:push({hello='world'})
+log_buffer:push({hello='world'})
+ngx.say("done")
+}
+}
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+BatchProcessor[log buffer] activating flush due to no activity
+--- error_log
+batch processor[log buffer] batch max size has exceeded
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+
+
+=== TEST 5: first failed to process and second try 

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-17 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380403114
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,181 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local ipairs = ipairs
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+function execute_func(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if not ok then
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries",
+batch_processor.name, batch.retry_count))
+end
+return
+end
+
+core.log.debug(fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+end
+
+
+local function flush_buffer(premature, batch_processor)
+if premature then
+return
+end
+
+if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
+now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
+core.log.debug(fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush",
+batch_processor.name))
+batch_processor:process_buffer()
+batch_processor.isTimerRunning = false
+return
+end
+
+-- buffer duration did not exceed or the buffer is active, extending the 
timer
+ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", 
batch_processor.name))
+create_buffer_timer(batch_processor)
+end
+
+
+function create_buffer_timer(batch_processor)
+local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, 
batch_processor)
+if not hdl then
+core.log.error("failed to create buffer timer: ", err)
+return
+end
+batch_processor.isTimerRunning = true
+end
+
+
+function Batch_Processor:new(func, config)
+local ok, err = core.schema.check(schema, config)
+
+if not ok then
+return err
+end
+
+if not(type(func) == "function") then
+return nil, "Invalid argument, arg #1 must be a function"
+end
+
+local batch_processor = {
+func = func,
+buffer_duration = config.buffer_duration,
+inactive_timeout = config.inactive_timeout,
+max_retry_count = config.max_retry_count,
+batch_max_size = config.batch_max_size,
+retry_delay = 

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-17 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r380402546
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
+ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+
+else
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries", batch_processor.name, 
batch.retry_count))
+end
+end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+if premature then
+return
+end
+
+if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
+now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
+ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush", batch_processor.name))
+batch_processor:process_buffer()
+batch_processor.isTimerRunning = false
+return
+end
+
+-- buffer duration did not exceed or the buffer is active, extending the 
timer
+ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", 
batch_processor.name))
 
 Review comment:
   any news?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-16 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379945467
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
+ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+
+else
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries", batch_processor.name, 
batch.retry_count))
+end
+end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+if premature then
+return
+end
+
+if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
+now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
+ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush", batch_processor.name))
+batch_processor:process_buffer()
+batch_processor.isTimerRunning = false
+return
+end
+
+-- buffer duration did not exceed or the buffer is active, extending the 
timer
+ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", 
batch_processor.name))
+create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
 
 Review comment:
   same problem with function `execute_func`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-16 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379944773
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
+ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+
+else
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries", batch_processor.name, 
batch.retry_count))
+end
+end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+if premature then
+return
+end
+
+if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
+now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
+ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush", batch_processor.name))
+batch_processor:process_buffer()
+batch_processor.isTimerRunning = false
+return
+end
+
+-- buffer duration did not exceed or the buffer is active, extending the 
timer
+ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", 
batch_processor.name))
+create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
 
 Review comment:
   good style for this case:  `function create_buffer_timer(batch_processor)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-16 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379944685
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
+ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+
+else
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries", batch_processor.name, 
batch.retry_count))
+end
+end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+if premature then
+return
+end
+
+if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
+now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
+ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush", batch_processor.name))
+batch_processor:process_buffer()
+batch_processor.isTimerRunning = false
+return
+end
+
+-- buffer duration did not exceed or the buffer is active, extending the 
timer
+ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", 
batch_processor.name))
 
 Review comment:
   code style: `code.log.debug(...)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-16 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379945418
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,180 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
 
 Review comment:
   please take a look at this new style:
   
   ```lua
   local ok , err = ...
   if not ok then
   batch.retry_count = batch.retry_count + 1
   ... 
   end
   
   core.log.debug(...)
   return
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-16 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379944560
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,180 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
+ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+
+else
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries", batch_processor.name, 
batch.retry_count))
+end
+end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+if premature then
+return
+end
+
+if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
+now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
+core.log.debug(fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush", batch_processor.name))
 
 Review comment:
   code style: this line is too long
   
   The single line of code should be less than 80 characters, and please fix 
other similar points.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-16 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379945034
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
+ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+
+else
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries", batch_processor.name, 
batch.retry_count))
+end
+end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+if premature then
+return
+end
+
+if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
+now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
+ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush", batch_processor.name))
+batch_processor:process_buffer()
+batch_processor.isTimerRunning = false
+return
+end
+
+-- buffer duration did not exceed or the buffer is active, extending the 
timer
+ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", 
batch_processor.name))
+create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
+local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, 
batch_processor)
+if not hdl then
+core.log.error("failed to create buffer timer: ", err)
+return
+end
+batch_processor.isTimerRunning = true
+end
+
+
+function Batch_Processor:new(func, config)
+local ok, err = core.schema.check(schema, config)
+
+if not ok then
+return err
+end
+
+if not(type(func) == "function") then
+return nil, "Invalid argument, arg #1 must be a function"
+end
+
+local batch_processor = {
+func = func,
+buffer_duration = config.buffer_duration,
+inactive_timeout = config.inactive_timeout,
+max_retry_count = config.max_retry_count,
+batch_max_size = config.batch_max_size,
+retry_delay = config.retry_delay,
+name 

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-16 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379900594
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
+ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+
+else
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries", batch_processor.name, 
batch.retry_count))
+end
+end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+if premature then
+return
+end
+
+if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
+now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
+ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush", batch_processor.name))
 
 Review comment:
   @sshniro the #1124 has been merged, you can continue your job now ^_^


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-13 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r379231789
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
+ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+
+else
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries", batch_processor.name, 
batch.retry_count))
+end
+end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+if premature then
+return
+end
+
+if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
+now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
+ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush", batch_processor.name))
 
 Review comment:
   I have submitted a PR to fix it: 
https://github.com/apache/incubator-apisix/pull/1124
   
   You  can rebase your branch after 
https://github.com/apache/incubator-apisix/pull/1124 merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-13 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378772964
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
+ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+
+else
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries", batch_processor.name, 
batch.retry_count))
+end
+end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+if premature then
+return
+end
+
+if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
+now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
+ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush", batch_processor.name))
 
 Review comment:
   that is a bug, I will submit a new PR to fix this bug later


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-12 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378637883
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
+ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+
+else
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries", batch_processor.name, 
batch.retry_count))
+end
+end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
 
 Review comment:
   please remove this useless blank line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-12 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378644301
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
+ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+
+else
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries", batch_processor.name, 
batch.retry_count))
+end
+end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+if premature then
+return
+end
+
+if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
+now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
+ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush", batch_processor.name))
+batch_processor:process_buffer()
+batch_processor.isTimerRunning = false
+return
+end
+
+-- buffer duration did not exceed or the buffer is active, extending the 
timer
+ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", 
batch_processor.name))
+create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
+local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, 
batch_processor)
+if not hdl then
+core.log.error("failed to create buffer timer: ", err)
+return
+end
+batch_processor.isTimerRunning = true
+end
+
+
+function Batch_Processor:new(func, config)
+local ok, err = core.schema.check(schema, config)
+
+if not ok then
+return err
+end
+
+if not(type(func) == "function") then
+return nil, "Invalid argument, arg #1 must be a function"
+end
+
+local batch_processor = {
+func = func,
+buffer_duration = config.buffer_duration,
+inactive_timeout = config.inactive_timeout,
+max_retry_count = config.max_retry_count,
+batch_max_size = config.batch_max_size,
+retry_delay = config.retry_delay,
+name 

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-12 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378637750
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
+ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+
+else
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries", batch_processor.name, 
batch.retry_count))
+end
+end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+if premature then
+return
+end
+
+if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
+now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
+ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush", batch_processor.name))
 
 Review comment:
   change to `core.log.debug(...)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-12 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378637838
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
+ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+
+else
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries", batch_processor.name, 
batch.retry_count))
+end
+end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+if premature then
+return
+end
+
+if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
+now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
+ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush", batch_processor.name))
+batch_processor:process_buffer()
+batch_processor.isTimerRunning = false
+return
+end
+
+-- buffer duration did not exceed or the buffer is active, extending the 
timer
+ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", 
batch_processor.name))
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-12 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378643680
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
+ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+
+else
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries", batch_processor.name, 
batch.retry_count))
+end
+end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+if premature then
+return
+end
+
+if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
+now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
+ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush", batch_processor.name))
+batch_processor:process_buffer()
+batch_processor.isTimerRunning = false
+return
+end
+
+-- buffer duration did not exceed or the buffer is active, extending the 
timer
+ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", 
batch_processor.name))
+create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
+local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, 
batch_processor)
+if not hdl then
+core.log.error("failed to create buffer timer: ", err)
+return
+end
+batch_processor.isTimerRunning = true
+end
+
+
+function Batch_Processor:new(func, config)
+local ok, err = core.schema.check(schema, config)
+
+if not ok then
+return err
+end
+
+if not(type(func) == "function") then
+return nil, "Invalid argument, arg #1 must be a function"
+end
+
+local batch_processor = {
+func = func,
+buffer_duration = config.buffer_duration,
+inactive_timeout = config.inactive_timeout,
+max_retry_count = config.max_retry_count,
+batch_max_size = config.batch_max_size,
+retry_delay = config.retry_delay,
+name 

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-12 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378638012
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
+ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+
+else
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries", batch_processor.name, 
batch.retry_count))
+end
+end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+if premature then
+return
+end
+
+if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
+now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
+ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush", batch_processor.name))
+batch_processor:process_buffer()
+batch_processor.isTimerRunning = false
+return
+end
+
+-- buffer duration did not exceed or the buffer is active, extending the 
timer
+ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", 
batch_processor.name))
+create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
 
 Review comment:
   code style: `local function create_buffer_timer()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-12 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378643354
 
 

 ##
 File path: lua/apisix/plugins/batch-processor.lua
 ##
 @@ -0,0 +1,179 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local setmetatable = setmetatable
+local timer_at = ngx.timer.at
+local remove = table.remove
+local fmt = string.format
+local ngx_log = ngx.log
+local DEBUG = ngx.DEBUG
+local table = table
+local now = ngx.now
+local type = type
+local Batch_Processor = {}
+local Batch_Processor_mt = {
+__index = Batch_Processor
+}
+local execute_func
+local create_buffer_timer
+
+
+local schema = {
+type = "object",
+properties = {
+name = {type = "string", default = "log buffer"},
+max_retry_count = {type = "integer", minimum = 0, default= 0},
+retry_delay = {type = "integer", minimum = 0, default= 1},
+buffer_duration = {type = "integer", minimum = 1, default= 60}, -- 
maximum age in seconds of the oldest log item in a batch before the batch must 
be transmitted
+inactive_timeout = {type = "integer", minimum = 1, default= 5}, -- 
maximum age in seconds when the buffer will be flushed if inactive
+batch_max_size = {type = "integer", minimum = 1, default= 1000}, -- 
maximum number of entries in a batch before the batch must be transmitted
+}
+}
+
+
+local function schedule_func_exec(batch_processor, delay, batch)
+local hdl, err = timer_at(delay, execute_func, batch_processor, batch)
+if not hdl then
+core.log.error("failed to create process timer: ", err)
+return
+end
+end
+
+
+execute_func = function(premature, batch_processor, batch)
+if premature then
+return
+end
+
+local ok, err = batch_processor.func(batch.entries)
+if ok then
+ngx_log(DEBUG, fmt("Batch Processor[%s] successfully processed the 
entries", batch_processor.name))
+
+else
+batch.retry_count = batch.retry_count + 1
+if batch.retry_count < batch_processor.max_retry_count then
+core.log.warn(fmt("Batch Processor[%s] failed to process entries: 
", batch_processor.name), err)
+schedule_func_exec(batch_processor, batch_processor.retry_delay, 
batch)
+else
+core.log.error(fmt("Batch Processor[%s] exceeded the 
max_retry_count[%d], dropping the entries", batch_processor.name, 
batch.retry_count))
+end
+end
+end
+
+
+local function flush_buffer(premature, batch_processor)
+
+if premature then
+return
+end
+
+if now() - batch_processor.last_entry_t >= 
batch_processor.inactive_timeout or
+now() - batch_processor.first_entry_t >= 
batch_processor.buffer_duration then
+ngx_log(DEBUG, fmt("BatchProcessor[%s] buffer duration exceeded, 
activating buffer flush", batch_processor.name))
+batch_processor:process_buffer()
+batch_processor.isTimerRunning = false
+return
+end
+
+-- buffer duration did not exceed or the buffer is active, extending the 
timer
+ngx_log(DEBUG, fmt("BatchProcessor[%s] extending buffer timer", 
batch_processor.name))
+create_buffer_timer(batch_processor)
+end
+
+
+create_buffer_timer = function(batch_processor)
+local hdl, err = timer_at(batch_processor.inactive_timeout, flush_buffer, 
batch_processor)
+if not hdl then
+core.log.error("failed to create buffer timer: ", err)
+return
+end
+batch_processor.isTimerRunning = true
+end
+
+
+function Batch_Processor:new(func, config)
+local ok, err = core.schema.check(schema, config)
+
+if not ok then
+return err
+end
+
+if not(type(func) == "function") then
+return nil, "Invalid argument, arg #1 must be a function"
+end
+
+local batch_processor = {
+func = func,
+buffer_duration = config.buffer_duration,
+inactive_timeout = config.inactive_timeout,
+max_retry_count = config.max_retry_count,
+batch_max_size = config.batch_max_size,
+retry_delay = config.retry_delay,
+name 

[GitHub] [incubator-apisix] membphis commented on a change in pull request #1121: Batch processor implementation to aggregate logs in batch

2020-02-12 Thread GitBox
membphis commented on a change in pull request #1121: Batch processor 
implementation to aggregate logs in batch 
URL: https://github.com/apache/incubator-apisix/pull/1121#discussion_r378644474
 
 

 ##
 File path: t/plugin/batch-processor-refac.t
 ##
 @@ -0,0 +1,374 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+run_tests;
+
+__DATA__
+
+=== TEST 1: wrong configuration parameters
+--- config
+location /t {
+content_by_lua_block {
+local Batch = require("apisix.plugins.batch-processor")
+local func_to_send = function(elements)
+return true
+end
+
+local config = {
+max_retry_count  = 2,
+batch_max_size = 1,
+process_delay  = 0,
+retry_delay  = 0,
+}
+
+local log_buffer, err = Batch:new("", config)
+
+if log_buffer then
+log_buffer:push({hello='world'})
+ngx.say("done")
+end
+
+if not log_buffer then
+ngx.say("failed")
+end
+
+}
+}
+--- request
+GET /t
+--- response_body
+failed
+--- wait: 0.5
+
+
+=== TEST 2: sanity
+--- config
+location /t {
+content_by_lua_block {
+local Batch = require("apisix.plugins.batch-processor")
+local func_to_send = function(elements)
+return true
+end
+
+local config = {
+max_retry_count  = 2,
+batch_max_size = 1,
+process_delay  = 0,
+retry_delay  = 0,
+}
+
+local log_buffer, err = Batch:new(func_to_send, config)
+
+if not log_buffer then
+ngx.say(err)
+end
+
+log_buffer:push({hello='world'})
+ngx.say("done")
+}
+}
+--- request
+GET /t
+--- response_body
+done
+--- error_log
+Batch Processor[log buffer] successfully processed the entries
+--- wait: 0.5
+
+=== TEST 3: batch processor timeout exceeded
 
 Review comment:
   
https://github.com/apache/incubator-apisix/blob/fda20d99d55d91905622b9d780e4dce79d128e76/Contributing.md#check-code-style-and-test-case-style
   
   check your code style and test case style


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services