This is an automated email from the ASF dual-hosted git repository.
ashishtiwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 2c041a39b feat: add max pending entries to all logger plugins (#12709)
2c041a39b is described below
commit 2c041a39bf201abfd3743acec0b6fd9b8e77bf01
Author: Ashish Tiwari <[email protected]>
AuthorDate: Thu Oct 30 11:33:42 2025 +0530
feat: add max pending entries to all logger plugins (#12709)
---
apisix/plugins/clickhouse-logger.lua | 15 ++-
apisix/plugins/elasticsearch-logger.lua | 17 ++-
apisix/plugins/google-cloud-logging.lua | 16 ++-
apisix/plugins/http-logger.lua | 15 ++-
apisix/plugins/loki-logger.lua | 15 ++-
apisix/plugins/rocketmq-logger.lua | 15 ++-
apisix/plugins/skywalking-logger.lua | 15 ++-
apisix/plugins/splunk-hec-logging.lua | 14 ++-
apisix/plugins/tcp-logger.lua | 15 ++-
apisix/plugins/tencent-cloud-cls.lua | 16 ++-
apisix/plugins/udp-logger.lua | 15 ++-
docs/en/latest/plugins/clickhouse-logger.md | 1 +
docs/en/latest/plugins/elasticsearch-logger.md | 1 +
docs/en/latest/plugins/google-cloud-logging.md | 1 +
docs/en/latest/plugins/http-logger.md | 1 +
docs/en/latest/plugins/kafka-logger.md | 1 +
docs/en/latest/plugins/loki-logger.md | 1 +
docs/en/latest/plugins/rocketmq-logger.md | 1 +
docs/en/latest/plugins/skywalking-logger.md | 1 +
docs/en/latest/plugins/splunk-hec-logging.md | 1 +
docs/en/latest/plugins/tcp-logger.md | 1 +
docs/en/latest/plugins/tencent-cloud-cls.md | 1 +
docs/en/latest/plugins/udp-logger.md | 1 +
docs/zh/latest/plugins/clickhouse-logger.md | 1 +
docs/zh/latest/plugins/elasticsearch-logger.md | 1 +
docs/zh/latest/plugins/google-cloud-logging.md | 1 +
docs/zh/latest/plugins/http-logger.md | 1 +
docs/zh/latest/plugins/kafka-logger.md | 1 +
docs/zh/latest/plugins/loki-logger.md | 1 +
docs/zh/latest/plugins/rocketmq-logger.md | 1 +
docs/zh/latest/plugins/skywalking-logger.md | 1 +
docs/zh/latest/plugins/splunk-hec-logging.md | 1 +
docs/zh/latest/plugins/tcp-logger.md | 1 +
docs/zh/latest/plugins/tencent-cloud-cls.md | 1 +
docs/zh/latest/plugins/udp-logger.md | 1 +
t/plugin/clickhouse-logger.t | 87 ++++++++++++++
t/plugin/elasticsearch-logger2.t | 121 +++++++++++++++++++
t/plugin/google-cloud-logging3.t | 153 +++++++++++++++++++++++++
t/plugin/http-logger3.t | 121 +++++++++++++++++++
t/plugin/loki-logger2.t | 114 ++++++++++++++++++
t/plugin/skywalking-logger2.t | 114 ++++++++++++++++++
t/plugin/splunk-hec-logging2.t | 119 +++++++++++++++++++
42 files changed, 988 insertions(+), 33 deletions(-)
diff --git a/apisix/plugins/clickhouse-logger.lua
b/apisix/plugins/clickhouse-logger.lua
index 793a8d462..b3e791f07 100644
--- a/apisix/plugins/clickhouse-logger.lua
+++ b/apisix/plugins/clickhouse-logger.lua
@@ -17,6 +17,7 @@
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local log_util = require("apisix.utils.log-util")
+local plugin = require("apisix.plugin")
local core = require("apisix.core")
local http = require("resty.http")
local url = require("net.url")
@@ -71,7 +72,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
- }
+ },
+ max_pending_entries = {
+ type = "integer",
+ description = "maximum number of pending entries in the batch
processor",
+ minimum = 1,
+ },
},
}
@@ -174,9 +180,12 @@ end
function _M.log(conf, ctx)
+ local metadata = plugin.plugin_metadata(plugin_name)
+ local max_pending_entries = metadata and metadata.value and
+ metadata.value.max_pending_entries or nil
local entry = log_util.get_log_entry(plugin_name, conf, ctx)
- if batch_processor_manager:add_entry(conf, entry) then
+ if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end
@@ -201,7 +210,7 @@ function _M.log(conf, ctx)
return send_http_data(conf, data)
end
- batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
+ batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func,
max_pending_entries)
end
diff --git a/apisix/plugins/elasticsearch-logger.lua
b/apisix/plugins/elasticsearch-logger.lua
index 09dcbd795..3d9da16bb 100644
--- a/apisix/plugins/elasticsearch-logger.lua
+++ b/apisix/plugins/elasticsearch-logger.lua
@@ -19,7 +19,7 @@ local core = require("apisix.core")
local http = require("resty.http")
local log_util = require("apisix.utils.log-util")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
-
+local plugin = require("apisix.plugin")
local ngx = ngx
local str_format = core.string.format
local math_random = math.random
@@ -104,7 +104,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
- }
+ },
+ max_pending_entries = {
+ type = "integer",
+ description = "maximum number of pending entries in the batch
processor",
+ minimum = 1,
+ },
},
}
@@ -264,9 +269,12 @@ function _M.access(conf)
end
function _M.log(conf, ctx)
+ local metadata = plugin.plugin_metadata(plugin_name)
+ local max_pending_entries = metadata and metadata.value and
+ metadata.value.max_pending_entries or nil
local entry = get_logger_entry(conf, ctx)
- if batch_processor_manager:add_entry(conf, entry) then
+ if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end
@@ -274,7 +282,8 @@ function _M.log(conf, ctx)
return send_to_elasticsearch(conf, entries)
end
- batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx,
process)
+ batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx,
+ process,
max_pending_entries)
end
diff --git a/apisix/plugins/google-cloud-logging.lua
b/apisix/plugins/google-cloud-logging.lua
index 62ca991c0..09d6eb5f9 100644
--- a/apisix/plugins/google-cloud-logging.lua
+++ b/apisix/plugins/google-cloud-logging.lua
@@ -16,6 +16,7 @@
--
local core = require("apisix.core")
+local plugin = require("apisix.plugin")
local tostring = tostring
local http = require("resty.http")
local log_util = require("apisix.utils.log-util")
@@ -110,7 +111,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
- }
+ },
+ max_pending_entries = {
+ type = "integer",
+ description = "maximum number of pending entries in the batch
processor",
+ minimum = 1,
+ },
},
}
@@ -241,6 +247,9 @@ end
function _M.log(conf, ctx)
+ local metadata = plugin.plugin_metadata(plugin_name)
+ local max_pending_entries = metadata and metadata.value and
+ metadata.value.max_pending_entries or nil
local oauth, err = core.lrucache.plugin_ctx(lrucache, ctx, nil,
create_oauth_object, conf)
if not oauth then
@@ -250,7 +259,7 @@ function _M.log(conf, ctx)
local entry = get_logger_entry(conf, ctx, oauth)
- if batch_processor_manager:add_entry(conf, entry) then
+ if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end
@@ -258,7 +267,8 @@ function _M.log(conf, ctx)
return send_to_google(oauth, entries)
end
- batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx,
process)
+ batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx,
+ process,
max_pending_entries)
end
diff --git a/apisix/plugins/http-logger.lua b/apisix/plugins/http-logger.lua
index 44f84acc9..d3259e5db 100644
--- a/apisix/plugins/http-logger.lua
+++ b/apisix/plugins/http-logger.lua
@@ -16,6 +16,7 @@
--
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
+local plugin = require("apisix.plugin")
local log_util = require("apisix.utils.log-util")
local core = require("apisix.core")
local http = require("resty.http")
@@ -63,7 +64,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
- }
+ },
+ max_pending_entries = {
+ type = "integer",
+ description = "maximum number of pending entries in the batch
processor",
+ minimum = 1,
+ },
},
}
@@ -168,13 +174,16 @@ end
function _M.log(conf, ctx)
+ local metadata = plugin.plugin_metadata(plugin_name)
+ local max_pending_entries = metadata and metadata.value and
+ metadata.value.max_pending_entries or nil
local entry = log_util.get_log_entry(plugin_name, conf, ctx)
if not entry.route_id then
entry.route_id = "no-matched"
end
- if batch_processor_manager:add_entry(conf, entry) then
+ if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end
@@ -216,7 +225,7 @@ function _M.log(conf, ctx)
return send_http_data(conf, data)
end
- batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
+ batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func,
max_pending_entries)
end
diff --git a/apisix/plugins/loki-logger.lua b/apisix/plugins/loki-logger.lua
index 6ff5311eb..bb7c614f1 100644
--- a/apisix/plugins/loki-logger.lua
+++ b/apisix/plugins/loki-logger.lua
@@ -18,6 +18,7 @@
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local log_util = require("apisix.utils.log-util")
local core = require("apisix.core")
+local plugin = require("apisix.plugin")
local http = require("resty.http")
local new_tab = require("table.new")
@@ -115,7 +116,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
- }
+ },
+ max_pending_entries = {
+ type = "integer",
+ description = "maximum number of pending entries in the batch
processor",
+ minimum = 1,
+ },
},
}
@@ -193,6 +199,9 @@ end
function _M.log(conf, ctx)
+ local metadata = plugin.plugin_metadata(plugin_name)
+ local max_pending_entries = metadata and metadata.value and
+ metadata.value.max_pending_entries or nil
local entry = log_util.get_log_entry(plugin_name, conf, ctx)
if not entry.route_id then
@@ -205,7 +214,7 @@ function _M.log(conf, ctx)
-- and then add 6 zeros by string concatenation
entry.loki_log_time = tostring(ngx.req.start_time() * 1000) .. "000000"
- if batch_processor_manager:add_entry(conf, entry) then
+ if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end
@@ -244,7 +253,7 @@ function _M.log(conf, ctx)
return send_http_data(conf, data)
end
- batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
+ batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func,
max_pending_entries)
end
diff --git a/apisix/plugins/rocketmq-logger.lua
b/apisix/plugins/rocketmq-logger.lua
index 2f0cd5b4b..9dd5b2a93 100644
--- a/apisix/plugins/rocketmq-logger.lua
+++ b/apisix/plugins/rocketmq-logger.lua
@@ -15,6 +15,7 @@
-- limitations under the License.
--
local core = require("apisix.core")
+local plugin = require("apisix.plugin")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.rocketmq.producer")
local acl_rpchook = require("resty.rocketmq.acl_rpchook")
@@ -77,7 +78,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
- }
+ },
+ max_pending_entries = {
+ type = "integer",
+ description = "maximum number of pending entries in the batch
processor",
+ minimum = 1,
+ },
},
}
@@ -138,6 +144,9 @@ end
function _M.log(conf, ctx)
+ local metadata = plugin.plugin_metadata(plugin_name)
+ local max_pending_entries = metadata and metadata.value and
+ metadata.value.max_pending_entries or nil
local entry
if conf.meta_format == "origin" then
entry = log_util.get_req_original(ctx, conf)
@@ -145,7 +154,7 @@ function _M.log(conf, ctx)
entry = log_util.get_log_entry(plugin_name, conf, ctx)
end
- if batch_processor_manager:add_entry(conf, entry) then
+ if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end
@@ -184,7 +193,7 @@ function _M.log(conf, ctx)
return send_rocketmq_data(conf, data, prod)
end
- batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
+ batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func,
max_pending_entries)
end
diff --git a/apisix/plugins/skywalking-logger.lua
b/apisix/plugins/skywalking-logger.lua
index 8a7e309cd..c7c8dcae8 100644
--- a/apisix/plugins/skywalking-logger.lua
+++ b/apisix/plugins/skywalking-logger.lua
@@ -16,6 +16,7 @@
--
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
+local plugin = require("apisix.plugin")
local log_util = require("apisix.utils.log-util")
local core = require("apisix.core")
local http = require("resty.http")
@@ -64,7 +65,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
- }
+ },
+ max_pending_entries = {
+ type = "integer",
+ description = "maximum number of pending entries in the batch
processor",
+ minimum = 1,
+ },
},
}
@@ -139,6 +145,9 @@ end
function _M.log(conf, ctx)
+ local metadata = plugin.plugin_metadata(plugin_name)
+ local max_pending_entries = metadata and metadata.value and
+ metadata.value.max_pending_entries or nil
local log_body = log_util.get_log_entry(plugin_name, conf, ctx)
local trace_context
local sw_header = ngx.req.get_headers()["sw8"]
@@ -173,7 +182,7 @@ function _M.log(conf, ctx)
endpoint = ctx.var.uri,
}
- if batch_processor_manager:add_entry(conf, entry) then
+ if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end
@@ -187,7 +196,7 @@ function _M.log(conf, ctx)
return send_http_data(conf, data)
end
- batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
+ batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func,
max_pending_entries)
end
diff --git a/apisix/plugins/splunk-hec-logging.lua
b/apisix/plugins/splunk-hec-logging.lua
index c93b2736e..2024cc9fe 100644
--- a/apisix/plugins/splunk-hec-logging.lua
+++ b/apisix/plugins/splunk-hec-logging.lua
@@ -21,6 +21,7 @@ local ngx_now = ngx.now
local http = require("resty.http")
local log_util = require("apisix.utils.log-util")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
+local plugin = require("apisix.plugin")
local table_insert = core.table.insert
local table_concat = core.table.concat
local ipairs = ipairs
@@ -75,6 +76,11 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
+ },
+ max_pending_entries = {
+ type = "integer",
+ description = "maximum number of pending entries in the batch
processor",
+ minimum = 1,
}
},
}
@@ -169,9 +175,12 @@ end
function _M.log(conf, ctx)
+ local metadata = plugin.plugin_metadata(plugin_name)
+ local max_pending_entries = metadata and metadata.value and
+ metadata.value.max_pending_entries or nil
local entry = get_logger_entry(conf, ctx)
- if batch_processor_manager:add_entry(conf, entry) then
+ if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end
@@ -179,7 +188,8 @@ function _M.log(conf, ctx)
return send_to_splunk(conf, entries)
end
- batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx,
process)
+ batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx,
+ process,
max_pending_entries)
end
diff --git a/apisix/plugins/tcp-logger.lua b/apisix/plugins/tcp-logger.lua
index 7482fe515..9b3df668e 100644
--- a/apisix/plugins/tcp-logger.lua
+++ b/apisix/plugins/tcp-logger.lua
@@ -16,6 +16,7 @@
--
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
+local plugin = require("apisix.plugin")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local plugin_name = "tcp-logger"
local tostring = tostring
@@ -58,7 +59,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
- }
+ },
+ max_pending_entries = {
+ type = "integer",
+ description = "maximum number of pending entries in the batch
processor",
+ minimum = 1,
+ },
},
}
@@ -132,9 +138,12 @@ end
function _M.log(conf, ctx)
+ local metadata = plugin.plugin_metadata(plugin_name)
+ local max_pending_entries = metadata and metadata.value and
+ metadata.value.max_pending_entries or nil
local entry = log_util.get_log_entry(plugin_name, conf, ctx)
- if batch_processor_manager:add_entry(conf, entry) then
+ if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end
@@ -154,7 +163,7 @@ function _M.log(conf, ctx)
return send_tcp_data(conf, data)
end
- batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
+ batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func,
max_pending_entries)
end
diff --git a/apisix/plugins/tencent-cloud-cls.lua
b/apisix/plugins/tencent-cloud-cls.lua
index 38fe56503..f8dac160e 100644
--- a/apisix/plugins/tencent-cloud-cls.lua
+++ b/apisix/plugins/tencent-cloud-cls.lua
@@ -16,6 +16,7 @@
--
local core = require("apisix.core")
+local plugin = require("apisix.plugin")
local log_util = require("apisix.utils.log-util")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local cls_sdk = require("apisix.plugins.tencent-cloud-cls.cls-sdk")
@@ -67,7 +68,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
- }
+ },
+ max_pending_entries = {
+ type = "integer",
+ description = "maximum number of pending entries in the batch
processor",
+ minimum = 1,
+ },
},
}
@@ -112,6 +118,9 @@ end
function _M.log(conf, ctx)
+ local metadata = plugin.plugin_metadata(plugin_name)
+ local max_pending_entries = metadata and metadata.value and
+ metadata.value.max_pending_entries or nil
-- sample if set
if not ctx.cls_sample then
core.log.debug("cls not sampled, skip log")
@@ -126,7 +135,7 @@ function _M.log(conf, ctx)
end
end
- if batch_processor_manager:add_entry(conf, entry) then
+ if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end
@@ -139,7 +148,8 @@ function _M.log(conf, ctx)
return sdk:send_to_cls(entries)
end
- batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx,
process)
+ batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx,
+ process,
max_pending_entries)
end
diff --git a/apisix/plugins/udp-logger.lua b/apisix/plugins/udp-logger.lua
index 75e8bba31..a2333b0b5 100644
--- a/apisix/plugins/udp-logger.lua
+++ b/apisix/plugins/udp-logger.lua
@@ -16,6 +16,7 @@
--
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
+local plugin = require("apisix.plugin")
local bp_manager_mod = require("apisix.utils.batch-processor-manager")
local plugin_name = "udp-logger"
local tostring = tostring
@@ -56,7 +57,12 @@ local metadata_schema = {
properties = {
log_format = {
type = "object"
- }
+ },
+ max_pending_entries = {
+ type = "integer",
+ description = "maximum number of pending entries in the batch
processor",
+ minimum = 1,
+ },
},
}
@@ -117,9 +123,12 @@ end
function _M.log(conf, ctx)
+ local metadata = plugin.plugin_metadata(plugin_name)
+ local max_pending_entries = metadata and metadata.value and
+ metadata.value.max_pending_entries or nil
local entry = log_util.get_log_entry(plugin_name, conf, ctx)
- if batch_processor_manager:add_entry(conf, entry) then
+ if batch_processor_manager:add_entry(conf, entry, max_pending_entries) then
return
end
@@ -139,7 +148,7 @@ function _M.log(conf, ctx)
return send_udp_data(conf, data)
end
- batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
+ batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func,
max_pending_entries)
end
return _M
diff --git a/docs/en/latest/plugins/clickhouse-logger.md
b/docs/en/latest/plugins/clickhouse-logger.md
index 023f9e920..0d96f6c05 100644
--- a/docs/en/latest/plugins/clickhouse-logger.md
+++ b/docs/en/latest/plugins/clickhouse-logger.md
@@ -104,6 +104,7 @@ You can also set the format of the logs by configuring the
Plugin metadata. The
| Name | Type | Required | Default
| Description
|
| ---------- | ------ | -------- |
----------------------------------------------------------------------------- |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
| log_format | object | False | | Log format declared as key value pairs in
JSON format. Values only support strings. [APISIX](../apisix-variable.md) or
[Nginx](http://nginx.org/en/docs/varindex.html) variables can be used by
prefixing the string with `$`. |
+| max_pending_entries | integer | False | | Maximum number of pending entries
that can be buffered in batch processor before it starts dropping them. |
:::info IMPORTANT
diff --git a/docs/en/latest/plugins/elasticsearch-logger.md
b/docs/en/latest/plugins/elasticsearch-logger.md
index 80ea07601..8c6ecbfb9 100644
--- a/docs/en/latest/plugins/elasticsearch-logger.md
+++ b/docs/en/latest/plugins/elasticsearch-logger.md
@@ -62,6 +62,7 @@ This Plugin supports using batch processors to aggregate and
process entries (lo
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
| log_format | object | False | | Custom log format in key-value pairs in
JSON format. Support [APISIX variables](../apisix-variable.md) and [NGINX
variables](http://nginx.org/en/docs/varindex.html) in values. |
+| max_pending_entries | integer | False | | Maximum number of pending entries
that can be buffered in batch processor before it starts dropping them. |
## Examples
diff --git a/docs/en/latest/plugins/google-cloud-logging.md
b/docs/en/latest/plugins/google-cloud-logging.md
index 85b972381..2fda43784 100644
--- a/docs/en/latest/plugins/google-cloud-logging.md
+++ b/docs/en/latest/plugins/google-cloud-logging.md
@@ -91,6 +91,7 @@ You can also set the format of the logs by configuring the
Plugin metadata. The
| Name | Type | Required | Default
| Description
|
| ---------- | ------ | -------- |
----------------------------------------------------------------------------- |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
| log_format | object | False | | Log format declared as key value pairs
in JSON format. Values only support strings. [APISIX](../apisix-variable.md) or
[Nginx](http://nginx.org/en/docs/varindex.html) variables can be used by
prefixing the string with `$`. |
+| max_pending_entries | integer | False | | Maximum number of pending entries
that can be buffered in batch processor before it starts dropping them. |
:::info IMPORTANT
diff --git a/docs/en/latest/plugins/http-logger.md
b/docs/en/latest/plugins/http-logger.md
index d07375b0b..df423a226 100644
--- a/docs/en/latest/plugins/http-logger.md
+++ b/docs/en/latest/plugins/http-logger.md
@@ -105,6 +105,7 @@ You can also set the format of the logs by configuring the
Plugin metadata. The
| Name | Type | Required | Default
| Description
|
| ---------- | ------ | -------- |
----------------------------------------------------------------------------- |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
| log_format | object | False | | Log format declared as key value pairs
in JSON format. Values only support strings. [APISIX](../apisix-variable.md) or
[Nginx](http://nginx.org/en/docs/varindex.html) variables can be used by
prefixing the string with `$`. |
+| max_pending_entries | integer | False | | Maximum number of pending entries
that can be buffered in batch processor before it starts dropping them. |
:::info IMPORTANT
diff --git a/docs/en/latest/plugins/kafka-logger.md
b/docs/en/latest/plugins/kafka-logger.md
index de8bddd39..e27a2c0cc 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -139,6 +139,7 @@ You can also set the format of the logs by configuring the
Plugin metadata. The
| Name | Type | Required | Default
| Description
|
| ---------- | ------ | -------- |
----------------------------------------------------------------------------- |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
| log_format | object | False | | Log format declared as key value pairs in
JSON format. Values only support strings. [APISIX](../apisix-variable.md) or
[Nginx](http://nginx.org/en/docs/varindex.html) variables can be used by
prefixing the string with `$`. |
+| max_pending_entries | integer | False | | Maximum number of pending entries
that can be buffered in batch processor before it starts dropping them. |
:::info IMPORTANT
diff --git a/docs/en/latest/plugins/loki-logger.md
b/docs/en/latest/plugins/loki-logger.md
index 65a0a86d7..22dade6b1 100644
--- a/docs/en/latest/plugins/loki-logger.md
+++ b/docs/en/latest/plugins/loki-logger.md
@@ -68,6 +68,7 @@ You can also configure log format on a global scale using the
[Plugin Metadata](
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
| log_format | object | False | | Custom log format in key-value pairs in
JSON format. Support [APISIX variables](../apisix-variable.md) and [NGINX
variables](http://nginx.org/en/docs/varindex.html) in values. |
+| max_pending_entries | integer | False | | Maximum number of pending entries
that can be buffered in batch processor before it starts dropping them. |
## Examples
diff --git a/docs/en/latest/plugins/rocketmq-logger.md
b/docs/en/latest/plugins/rocketmq-logger.md
index ff09c668e..1f329c6d3 100644
--- a/docs/en/latest/plugins/rocketmq-logger.md
+++ b/docs/en/latest/plugins/rocketmq-logger.md
@@ -182,6 +182,7 @@ You can also set the format of the logs by configuring the
Plugin metadata. The
| Name | Type | Required | Default
| Description
|
|------------|--------|----------|-------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| log_format | object | False | | Log format declared as key value pairs
in JSON format. Values only support strings. [APISIX](../apisix-variable.md) or
[Nginx](http://nginx.org/en/docs/varindex.html) variables can be used by
prefixing the string with `$`. |
+| max_pending_entries | integer | False | | Maximum number of pending entries
that can be buffered in batch processor before it starts dropping them. |
:::info IMPORTANT
diff --git a/docs/en/latest/plugins/skywalking-logger.md
b/docs/en/latest/plugins/skywalking-logger.md
index bb0532a2a..4541ca5fc 100644
--- a/docs/en/latest/plugins/skywalking-logger.md
+++ b/docs/en/latest/plugins/skywalking-logger.md
@@ -62,6 +62,7 @@ You can also set the format of the logs by configuring the
Plugin metadata. The
| Name | Type | Required | Default
| Description
|
| ---------- | ------ | -------- |
----------------------------------------------------------------------------- |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
| log_format | object | False | | Custom log format in key-value pairs in
JSON format. Support [APISIX](../apisix-variable.md) or [NGINX
variables](http://nginx.org/en/docs/varindex.html) in values. |
+| max_pending_entries | integer | False | | Maximum number of pending entries
that can be buffered in batch processor before it starts dropping them. |
## Examples
diff --git a/docs/en/latest/plugins/splunk-hec-logging.md
b/docs/en/latest/plugins/splunk-hec-logging.md
index 565f84e4b..a6bb44dbb 100644
--- a/docs/en/latest/plugins/splunk-hec-logging.md
+++ b/docs/en/latest/plugins/splunk-hec-logging.md
@@ -87,6 +87,7 @@ You can also set the format of the logs by configuring the
Plugin metadata. The
| Name | Type | Required | Default
| Description
|
| ---------- | ------ | -------- |
----------------------------------------------------------------------------- |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
| log_format | object | False | | Log format declared as key value pairs
in JSON format. Values only support strings. [APISIX](../apisix-variable.md) or
[Nginx](http://nginx.org/en/docs/varindex.html) variables can be used by
prefixing the string with `$`. |
+| max_pending_entries | integer | False | | Maximum number of pending entries
that can be buffered in batch processor before it starts dropping them. |
:::info IMPORTANT
diff --git a/docs/en/latest/plugins/tcp-logger.md
b/docs/en/latest/plugins/tcp-logger.md
index 53fd43e66..514538452 100644
--- a/docs/en/latest/plugins/tcp-logger.md
+++ b/docs/en/latest/plugins/tcp-logger.md
@@ -100,6 +100,7 @@ You can also set the format of the logs by configuring the
Plugin metadata. The
| Name | Type | Required | Default
| Description
|
| ---------- | ------ | -------- |
----------------------------------------------------------------------------- |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
| log_format | object | False | | Log format declared as key value pairs
in JSON format. Values only support strings. [APISIX](../apisix-variable.md) or
[Nginx](http://nginx.org/en/docs/varindex.html) variables can be used by
prefixing the string with `$`. |
+| max_pending_entries | integer | False | | Maximum number of pending entries
that can be buffered in batch processor before it starts dropping them. |
:::info IMPORTANT
diff --git a/docs/en/latest/plugins/tencent-cloud-cls.md
b/docs/en/latest/plugins/tencent-cloud-cls.md
index f1ee4c43d..c02ab10f1 100644
--- a/docs/en/latest/plugins/tencent-cloud-cls.md
+++ b/docs/en/latest/plugins/tencent-cloud-cls.md
@@ -99,6 +99,7 @@ You can also set the format of the logs by configuring the
Plugin metadata. The
| Name | Type | Required | Default
| Description
|
| ---------- | ------ | -------- |
----------------------------------------------------------------------------- |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
| log_format | object | False | | Log format declared as key value pairs
in JSON format. Values only support strings. [APISIX](../apisix-variable.md) or
[Nginx](http://nginx.org/en/docs/varindex.html) variables can be used by
prefixing the string with `$`. |
+| max_pending_entries | integer | False | | Maximum number of pending entries
that can be buffered in batch processor before it starts dropping them. |
:::info IMPORTANT
diff --git a/docs/en/latest/plugins/udp-logger.md
b/docs/en/latest/plugins/udp-logger.md
index 503566a07..8001974c2 100644
--- a/docs/en/latest/plugins/udp-logger.md
+++ b/docs/en/latest/plugins/udp-logger.md
@@ -98,6 +98,7 @@ You can also set the format of the logs by configuring the
Plugin metadata. The
| Name | Type | Required | Default
| Description
|
| ---------- | ------ | -------- |
----------------------------------------------------------------------------- |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
| log_format | object | False | | Log format declared as key value pairs
in JSON format. Values only support strings. [APISIX](../apisix-variable.md) or
[Nginx](http://nginx.org/en/docs/varindex.html) variables can be used by
prefixing the string with `$`. |
+| max_pending_entries | integer | False | | Maximum number of pending entries
that can be buffered in batch processor before it starts dropping them. |
:::info IMPORTANT
diff --git a/docs/zh/latest/plugins/clickhouse-logger.md
b/docs/zh/latest/plugins/clickhouse-logger.md
index f482f449d..05cf471b3 100644
--- a/docs/zh/latest/plugins/clickhouse-logger.md
+++ b/docs/zh/latest/plugins/clickhouse-logger.md
@@ -104,6 +104,7 @@ description: 本文介绍了 API 网关 Apache APISIX 如何使用 clickhouse-lo
| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述
|
| ---------------- | ------- | ------ | ------------- | ------- |
------------------------------------------------ |
| log_format | object | 否 | | | 以 JSON
格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 `$` 开头,则表明是要获取 [APISIX](../apisix-variable.md)
或 [NGINX](http://nginx.org/en/docs/varindex.html) 变量。该配置全局生效。如果你指定了
`log_format`,该配置就会对所有绑定 `clickhouse-logger` 的路由或服务生效。|
+| max_pending_entries | integer | 否 | | | 在批处理器中开始删除待处理条目之前可以购买的最大待处理条目数。|
:::note
diff --git a/docs/zh/latest/plugins/elasticsearch-logger.md
b/docs/zh/latest/plugins/elasticsearch-logger.md
index 362b57959..af473dff6 100644
--- a/docs/zh/latest/plugins/elasticsearch-logger.md
+++ b/docs/zh/latest/plugins/elasticsearch-logger.md
@@ -63,6 +63,7 @@ description: elasticsearch-logger Plugin 将请求和响应日志批量推送到
| Name | Type | Required | Default | Description |
|------|------|----------|---------|-------------|
| log_format | object | 否 | |自定义日志格式为 JSON 格式的键值对。值中支持 [APISIX
变量](../apisix-variable.md) 和 [NGINX
变量](http://nginx.org/en/docs/varindex.html)。 |
+| max_pending_entries | integer | 否 | | | 在批处理器中开始删除待处理条目之前可以购买的最大待处理条目数。|
## 示例
diff --git a/docs/zh/latest/plugins/google-cloud-logging.md
b/docs/zh/latest/plugins/google-cloud-logging.md
index d485bee31..58c05dc13 100644
--- a/docs/zh/latest/plugins/google-cloud-logging.md
+++ b/docs/zh/latest/plugins/google-cloud-logging.md
@@ -89,6 +89,7 @@ description: API 网关 Apache APISIX 的 google-cloud-logging 插件可用于
| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述
|
| ---------------- | ------- | ------ | ------------- | ------- |
------------------------------------------------ |
| log_format | object | 否 | | | 以 JSON
格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 `$` 开头。则表明获取 [APISIX 变量](../apisix-variable.md)
或 [NGINX 内置变量](http://nginx.org/en/docs/varindex.html)。 |
+| max_pending_entries | integer | 否 | | | 在批处理器中开始删除待处理条目之前可以购买的最大待处理条目数。|
:::info 注意
diff --git a/docs/zh/latest/plugins/http-logger.md
b/docs/zh/latest/plugins/http-logger.md
index 1903ddd8b..5bde3434c 100644
--- a/docs/zh/latest/plugins/http-logger.md
+++ b/docs/zh/latest/plugins/http-logger.md
@@ -99,6 +99,7 @@ description: 本文介绍了 API 网关 Apache APISIX 的 http-logger 插件。
| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述
|
| ---------------- | ------- | ------ | ------------- | ------- |
------------------------------------------------ |
| log_format | object | 否 | | | 以 JSON
格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 `$` 开头。则表明获取 [APISIX
变量](../../../en/latest/apisix-variable.md) 或 [NGINX
内置变量](http://nginx.org/en/docs/varindex.html)。 |
+| max_pending_entries | integer | 否 | | | 在批处理器中开始删除待处理条目之前可以购买的最大待处理条目数。|
:::info 注意
diff --git a/docs/zh/latest/plugins/kafka-logger.md
b/docs/zh/latest/plugins/kafka-logger.md
index e708a21b8..886140e11 100644
--- a/docs/zh/latest/plugins/kafka-logger.md
+++ b/docs/zh/latest/plugins/kafka-logger.md
@@ -135,6 +135,7 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作
| 名称 | 类型 | 必选项 | 默认值 | 描述
|
| ---------------- | ------- | ------ | -------------
|------------------------------------------------ |
| log_format | object | 否 | | 以 JSON
格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 `$` 开头,则表明是要获取 [APISIX
变量](../../../en/latest/apisix-variable.md) 或 [NGINX
内置变量](http://nginx.org/en/docs/varindex.html)。 |
+| max_pending_entries | integer | 否 | | | 在批处理器中开始删除待处理条目之前可以购买的最大待处理条目数。|
:::note 注意
diff --git a/docs/zh/latest/plugins/loki-logger.md
b/docs/zh/latest/plugins/loki-logger.md
index bc32dcad1..3294c8969 100644
--- a/docs/zh/latest/plugins/loki-logger.md
+++ b/docs/zh/latest/plugins/loki-logger.md
@@ -68,6 +68,7 @@ description: loki-logger 插件通过 Loki HTTP API /loki/api/v1/push
将请求
| 名称 | 类型 | 必选项 | 默认值 | 描述 |
|------|------|----------|--|-------------|
| log_format | object | 否 | | 日志格式以 JSON 格式声明为键值对。值只支持字符串类型。可以通过在字符串前面加上 `$`
来使用 [APISIX 变量](../apisix-variable.md) 和 [NGINX
变量](http://nginx.org/en/docs/varindex.html) 。 |
+| max_pending_entries | integer | 否 | | | 在批处理器中开始删除待处理条目之前可以购买的最大待处理条目数。|
## 示例
diff --git a/docs/zh/latest/plugins/rocketmq-logger.md
b/docs/zh/latest/plugins/rocketmq-logger.md
index 21d8e4284..dae7854f4 100644
--- a/docs/zh/latest/plugins/rocketmq-logger.md
+++ b/docs/zh/latest/plugins/rocketmq-logger.md
@@ -123,6 +123,7 @@ description: API 网关 Apache APISIX 的 rocketmq-logger 插件用于将日志
| 名称 | 类型 | 必选项 | 默认值
| 描述
|
|------------|--------|-----|-------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| log_format | object | 否 | | 以 JSON 格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 `$`
开头,则表明是要获取 [APISIX 变量](../../../en/latest/apisix-variable.md) 或 [NGINX
内置变量](http://nginx.org/en/docs/varindex.html)。 |
+| max_pending_entries | integer | 否 | | | 在批处理器中开始删除待处理条目之前可以购买的最大待处理条目数。|
:::note 注意
diff --git a/docs/zh/latest/plugins/skywalking-logger.md
b/docs/zh/latest/plugins/skywalking-logger.md
index 79eab655e..e1231da83 100644
--- a/docs/zh/latest/plugins/skywalking-logger.md
+++ b/docs/zh/latest/plugins/skywalking-logger.md
@@ -61,6 +61,7 @@ description: skywalking-logger 将请求和响应日志作为 JSON 对象批量
| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述
|
| ---------------------- | ------- | ------ | -------------------- |
------------- |
---------------------------------------------------------------- |
| log_format | object | 否 | | 以 JSON 格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 `$`
开头,则表明是要获取 [APISIX 变量](../apisix-variable.md) 或 [NGINX
内置变量](http://nginx.org/en/docs/varindex.html)。 |
+| max_pending_entries | integer | 否 | | | 在批处理器中开始删除待处理条目之前可以购买的最大待处理条目数。|
## 示例
diff --git a/docs/zh/latest/plugins/splunk-hec-logging.md
b/docs/zh/latest/plugins/splunk-hec-logging.md
index ef2259409..7b7983b02 100644
--- a/docs/zh/latest/plugins/splunk-hec-logging.md
+++ b/docs/zh/latest/plugins/splunk-hec-logging.md
@@ -84,6 +84,7 @@ description: API 网关 Apache APISIX 的 splunk-hec-logging 插件可用于将
| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述
|
| ---------------- | ------- | ------ | ------------- | ------- |
------------------------------------------------ |
| log_format | object | 否 | | | 以 JSON
格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 `$` 开头。则表明获取 [APISIX 变量](../apisix-variable.md)
或 [NGINX 内置变量](http://nginx.org/en/docs/varindex.html)。 |
+| max_pending_entries | integer | 否 | | | 在批处理器中开始删除待处理条目之前可以购买的最大待处理条目数。|
:::info 注意
diff --git a/docs/zh/latest/plugins/tcp-logger.md
b/docs/zh/latest/plugins/tcp-logger.md
index 158bb7cba..b787c16bc 100644
--- a/docs/zh/latest/plugins/tcp-logger.md
+++ b/docs/zh/latest/plugins/tcp-logger.md
@@ -95,6 +95,7 @@ description: 本文介绍了 API 网关 Apache APISIX 如何使用 tcp-logger
| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述
|
| ---------------- | ------- | ------ | ------------- | ------- |
------------------------------------------------ |
| log_format | object | 否 | | | 以 JSON
格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 `$` 开头。则表明获取 [APISIX 变量](../apisix-variable.md)
或 [NGINX 内置变量](http://nginx.org/en/docs/varindex.html)。 |
+| max_pending_entries | integer | 否 | | | 在批处理器中开始删除待处理条目之前可以购买的最大待处理条目数。|
:::info 注意
diff --git a/docs/zh/latest/plugins/tencent-cloud-cls.md
b/docs/zh/latest/plugins/tencent-cloud-cls.md
index a5c41bd72..418d371bd 100644
--- a/docs/zh/latest/plugins/tencent-cloud-cls.md
+++ b/docs/zh/latest/plugins/tencent-cloud-cls.md
@@ -97,6 +97,7 @@ description: API 网关 Apache APISIX tencent-cloud-cls 插件可用于将日志
| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述
|
| ---------------- | ------- | ------ | ------------- | ------- |
------------------------------------------------ |
| log_format | object | 否 | | | 以 JSON
格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 `$` 开头。则表明获取 [APISIX
变量](../../../en/latest/apisix-variable.md) 或 [NGINX
内置变量](http://nginx.org/en/docs/varindex.html)。 |
+| max_pending_entries | integer | 否 | | | 在批处理器中开始删除待处理条目之前可以购买的最大待处理条目数。|
:::info 重要
diff --git a/docs/zh/latest/plugins/udp-logger.md
b/docs/zh/latest/plugins/udp-logger.md
index 45d0983ff..d0efa2a08 100644
--- a/docs/zh/latest/plugins/udp-logger.md
+++ b/docs/zh/latest/plugins/udp-logger.md
@@ -94,6 +94,7 @@ description: 本文介绍了 API 网关 Apache APISIX 如何使用 udp-logger
| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述
|
| ---------------- | ------- | ------ | ------------- | ------- |
------------------------------------------------ |
| log_format | object | 否 | | | 以 JSON
格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 `$` 开头。则表明获取 [APISIX 变量](../apisix-variable.md)
或 [NGINX 内置变量](http://nginx.org/en/docs/varindex.html)。 |
+| max_pending_entries | integer | 否 | | | 在批处理器中开始删除待处理条目之前可以购买的最大待处理条目数。|
:::info 注意
diff --git a/t/plugin/clickhouse-logger.t b/t/plugin/clickhouse-logger.t
index 4efcf11e3..e3f90c339 100644
--- a/t/plugin/clickhouse-logger.t
+++ b/t/plugin/clickhouse-logger.t
@@ -313,3 +313,90 @@ GET /opentracing
echo "select * from default.test" | curl 'http://localhost:8123/'
--data-binary @-
--- response_body_like
.*127.0.0.1.*1.*
+
+
+
+=== TEST 12: should drop entries when max_pending_entries is exceeded
+--- extra_yaml_config
+plugins:
+ - clickhouse-logger
+--- config
+location /t {
+ content_by_lua_block {
+ local http = require "resty.http"
+ local httpc = http.new()
+ local data = {
+ {
+ input = {
+ plugins = {
+ ["clickhouse-logger"] = {
+ user = "default",
+ password = "a",
+ database = "default",
+ logtable = "t",
+ endpoint_addr =
"http://127.0.0.1:1234/clickhouse-logger/test1",
+ batch_max_size = 1,
+ timeout = 1,
+ max_retry_count = 10
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1980"] = 1
+ },
+ type = "roundrobin"
+ },
+ uri = "/hello",
+ },
+ },
+ }
+
+ local t = require("lib.test_admin").test
+
+ -- Set plugin metadata
+ local metadata = {
+ log_format = {
+ host = "$host",
+ ["@timestamp"] = "$time_iso8601",
+ client_ip = "$remote_addr"
+ },
+ max_pending_entries = 1
+ }
+
+ local code, body =
t('/apisix/admin/plugin_metadata/clickhouse-logger', ngx.HTTP_PUT, metadata)
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ -- Create route
+ local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT,
data[1].input)
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ ngx.sleep(2)
+ }
+}
+--- error_log
+max pending entries limit exceeded. discarding entry
+--- timeout: 5
diff --git a/t/plugin/elasticsearch-logger2.t b/t/plugin/elasticsearch-logger2.t
new file mode 100644
index 000000000..18610a3e6
--- /dev/null
+++ b/t/plugin/elasticsearch-logger2.t
@@ -0,0 +1,121 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+no_shuffle();
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ if (!defined $block->request) {
+ $block->set_value("request", "GET /t");
+ }
+
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: should drop entries when max_pending_entries is exceededA
+--- extra_yaml_config
+plugins:
+ - elasticsearch-logger
+--- config
+location /t {
+ content_by_lua_block {
+ local http = require "resty.http"
+ local httpc = http.new()
+ local data = {
+ {
+ input = {
+ plugins = {
+ ["elasticsearch-logger"] = {
+ endpoint_addr = "http://127.0.0.1:1234",
+ field = {
+ index = "services"
+ },
+ batch_max_size = 1,
+ timeout = 1,
+ max_retry_count = 10
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1980"] = 1
+ },
+ type = "roundrobin"
+ },
+ uri = "/hello",
+ },
+ },
+ }
+
+ local t = require("lib.test_admin").test
+
+ -- Set plugin metadata
+ local metadata = {
+ log_format = {
+ host = "$host",
+ ["@timestamp"] = "$time_iso8601",
+ client_ip = "$remote_addr"
+ },
+ max_pending_entries = 1
+ }
+
+ local code, body =
t('/apisix/admin/plugin_metadata/elasticsearch-logger', ngx.HTTP_PUT, metadata)
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ -- Create route
+ local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT,
data[1].input)
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ ngx.sleep(2)
+ }
+}
+--- error_log
+max pending entries limit exceeded. discarding entry
+--- timeout: 5
diff --git a/t/plugin/google-cloud-logging3.t b/t/plugin/google-cloud-logging3.t
new file mode 100644
index 000000000..df70f5de5
--- /dev/null
+++ b/t/plugin/google-cloud-logging3.t
@@ -0,0 +1,153 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ if (!defined $block->request) {
+ $block->set_value("request", "GET /t");
+ }
+
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: should drop entries when max_pending_entries is exceeded
+--- extra_yaml_config
+plugins:
+ - google-cloud-logging
+--- config
+location /t {
+ content_by_lua_block {
+ local http = require "resty.http"
+ local httpc = http.new()
+ local data = {
+ {
+ input = {
+ plugins = {
+ ["google-cloud-logging"] = {
+ auth_config = {
+ client_email =
"[email protected]",
+ private_key = [[
+-----BEGIN PRIVATE KEY-----
+MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQDDzrFwnA3EvYyR
+aeMgaLD3hBjvxKrz10uox1X8q7YYhf2ViRtLRUMa2bEMYksE5hbhwpNf6mKAnLOC
+UuAT6cPPdUl/agKpJXviBPIR2LuzD17WsLJHp1HxUDssSkgfCaGcOGGNfLUhhIpF
+2JUctLmxiZoAZySlSjcwupSuDJ0aPm0XO8r9H8Qu5kF2Vkz5e5bFivLTmvzrQTe4
+v5V1UI6hThElCSeUmdNF3uG3wopxlvq4zXgLTnuLbrNf/Gc4mlpV+UDgTISj32Ep
+AB2vxKEbvQw4ti8YJnGXWjxLerhfrszFw+V8lpeduiDYA44ZFoVqvzxeIsVZNtcw
+Iu7PvEPNAgMBAAECggEAVpyN9m7A1F631/aLheFpLgMbeKt4puV7zQtnaJ2XrZ9P
+PR7pmNDpTu4uF3k/D8qrIm+L+uhVa+hkquf3wDct6w1JVnfQ93riImbnoKdK13ic
+DcEZCwLjByfjFMNCxZ/gAZca55fbExlqhFy6EHmMjhB8s2LsXcTHRuGxNI/Vyi49
+sxECibe0U53aqdJbVWrphIS67cpwl4TUkN6mrHsNuDYNJ9dgkpapoqp4FTFQsBqC
+afOK5qgJ68dWZ47FBUng+AZjdCncqAIuJxxItGVQP6YPsFs+OXcivIVHJr363TpC
+l85FfdvqWV5OGBbwSKhNwiTNUVvfSQVmtURGWG/HbQKBgQD4gZ1z9+Lx19kT9WTz
+lw93lxso++uhAPDTKviyWSRoEe5aN3LCd4My+/Aj+sk4ON/s2BV3ska5Im93j+vC
+rCv3uPn1n2jUhWuJ3bDqipeTW4n/CQA2m/8vd26TMk22yOkkqw2MIA8sjJ//SD7g
+tdG7up6DgGMP4hgbO89uGU7DAwKBgQDJtkKd0grh3u52Foeh9YaiAgYRwc65IE16
+UyD1OJxIuX/dYQDLlo5KyyngFa1ZhWIs7qC7r3xXH+10kfJY+Q+5YMjmZjlL8SR1
+Ujqd02R9F2//6OeswyReachJZbZdtiEw3lPa4jVFYfhSe0M2ZPxMwvoXb25eyCNI
+1lYjSKq87wKBgHnLTNghjeDp4UKe6rNYPgRm0rDrhziJtX5JeUov1mALKb6dnmkh
+GfRK9g8sQqKDfXwfC6Z2gaMK9YaryujGaWYoCpoPXtmJ6oLPXH4XHuLh4mhUiP46
+xn8FEfSimuQS4/FMxH8A128GHQSI7AhGFFzlwfrBWcvXC+mNDsTvMmLxAoGARc+4
+upppfccETQZ7JsitMgD1TMwA2f2eEwoWTAitvlXFNT9PYSbYVHaAJbga6PLLCbYF
+FzAjHpxEOKYSdEyu7n/ayDL0/Z2V+qzc8KarDsg/0RgwppBbU/nUgeKb/U79qcYo
+y4ai3UKNCS70Ei1dTMvmdpnwXwlxfNIBufB6dy0CgYBMYq9Lc31GkC6PcGEEbx6W
+vjImOadWZbuOVnvEQjb5XCdcOsWsMcg96PtoeuyyHmhnEF1GsMzcIdQv/PHrvYpK
+Yp8D0aqsLEgwGrJQER26FPpKmyIwvcL+nm6q5W31PnU9AOC/WEkB6Zs58hsMzD2S
+kEJQcmfVew5mFXyxuEn3zA==
+-----END PRIVATE KEY-----]],
+ project_id = "apisix",
+ token_uri =
"http://127.0.0.1:1234/google/logging/token",
+ scope = {
+ "https://apisix.apache.org/logs:admin"
+ },
+ entries_uri =
"http://127.0.0.1:1234/google/logging/entries",
+ },
+ batch_max_size = 1,
+ timeout = 1,
+ max_retry_count = 10
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1980"] = 1
+ },
+ type = "roundrobin"
+ },
+ uri = "/hello",
+ },
+ },
+ }
+
+ local t = require("lib.test_admin").test
+
+ -- Set plugin metadata
+ local metadata = {
+ log_format = {
+ host = "$host",
+ ["@timestamp"] = "$time_iso8601",
+ client_ip = "$remote_addr"
+ },
+ max_pending_entries = 1
+ }
+
+ local code, body =
t('/apisix/admin/plugin_metadata/google-cloud-logging', ngx.HTTP_PUT, metadata)
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ -- Create route
+ local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT,
data[1].input)
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ ngx.sleep(2)
+ }
+}
+--- error_log
+max pending entries limit exceeded. discarding entry
+--- timeout: 5
diff --git a/t/plugin/http-logger3.t b/t/plugin/http-logger3.t
new file mode 100644
index 000000000..70557e40e
--- /dev/null
+++ b/t/plugin/http-logger3.t
@@ -0,0 +1,121 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ if (!$block->request) {
+ $block->set_value("request", "GET /t");
+ }
+ my $extra_init_by_lua = <<_EOC_;
+ local bpm = require("apisix.utils.batch-processor-manager")
+ bpm.set_check_stale_interval(1)
+_EOC_
+
+ $block->set_value("extra_init_by_lua", $extra_init_by_lua);
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: should drop entries when max_pending_entries is exceeded
+--- extra_yaml_config
+plugins:
+ - http-logger
+--- config
+location /t {
+ content_by_lua_block {
+ local http = require "resty.http"
+ local httpc = http.new()
+ local data = {
+ {
+ input = {
+ plugins = {
+ ["http-logger"] = {
+ uri = "http://127.0.0.1:1234/http-logger/test",
+ batch_max_size = 1,
+ timeout = 1,
+ max_retry_count = 10
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1980"] = 1
+ },
+ type = "roundrobin"
+ },
+ uri = "/hello",
+ },
+ },
+ }
+
+ local t = require("lib.test_admin").test
+
+ -- Set plugin metadata
+ local metadata = {
+ log_format = {
+ host = "$host",
+ ["@timestamp"] = "$time_iso8601",
+ client_ip = "$remote_addr"
+ },
+ max_pending_entries = 1
+ }
+
+ local code, body = t('/apisix/admin/plugin_metadata/http-logger',
ngx.HTTP_PUT, metadata)
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ -- Create route
+ local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT,
data[1].input)
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ ngx.sleep(2)
+ }
+}
+--- error_log
+max pending entries limit exceeded. discarding entry
+--- timeout: 5
diff --git a/t/plugin/loki-logger2.t b/t/plugin/loki-logger2.t
new file mode 100644
index 000000000..a0f0f0ab9
--- /dev/null
+++ b/t/plugin/loki-logger2.t
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ if (!defined $block->request) {
+ $block->set_value("request", "GET /t");
+ }
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: should drop entries when max_pending_entries is exceeded
+--- extra_yaml_config
+plugins:
+ - loki-logger
+--- config
+location /t {
+ content_by_lua_block {
+ local http = require "resty.http"
+ local httpc = http.new()
+ local data = {
+ {
+ input = {
+ plugins = {
+ ["loki-logger"] = {
+ endpoint_addrs = {"http://127.0.0.1:1234"},
+ batch_max_size = 1,
+ timeout = 1,
+ max_retry_count = 10
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1980"] = 1
+ },
+ type = "roundrobin"
+ },
+ uri = "/hello",
+ },
+ },
+ }
+
+ local t = require("lib.test_admin").test
+
+ -- Set plugin metadata
+ local metadata = {
+ log_format = {
+ host = "$host",
+ ["@timestamp"] = "$time_iso8601",
+ client_ip = "$remote_addr"
+ },
+ max_pending_entries = 1
+ }
+
+ local code, body = t('/apisix/admin/plugin_metadata/loki-logger',
ngx.HTTP_PUT, metadata)
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ -- Create route
+ local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT,
data[1].input)
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ ngx.sleep(2)
+ }
+}
+--- error_log
+max pending entries limit exceeded. discarding entry
+--- timeout: 5
diff --git a/t/plugin/skywalking-logger2.t b/t/plugin/skywalking-logger2.t
new file mode 100644
index 000000000..0148ec4fc
--- /dev/null
+++ b/t/plugin/skywalking-logger2.t
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level('debug');
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+ if (!$block->request) {
+ $block->set_value("request", "GET /t");
+ }
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: should drop entries when max_pending_entries is exceededA
+--- extra_yaml_config
+plugins:
+ - skywalking-logger
+--- config
+location /t {
+ content_by_lua_block {
+ local http = require "resty.http"
+ local httpc = http.new()
+ local data = {
+ {
+ input = {
+ plugins = {
+ ["skywalking-logger"] = {
+ endpoint_addr = "http://127.0.0.1:1234/v3/logs",
+ batch_max_size = 1,
+ timeout = 1,
+ max_retry_count = 10
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1980"] = 1
+ },
+ type = "roundrobin"
+ },
+ uri = "/hello",
+ },
+ },
+ }
+
+ local t = require("lib.test_admin").test
+
+ -- Set plugin metadata
+ local metadata = {
+ log_format = {
+ host = "$host",
+ ["@timestamp"] = "$time_iso8601",
+ client_ip = "$remote_addr"
+ },
+ max_pending_entries = 1
+ }
+
+ local code, body =
t('/apisix/admin/plugin_metadata/skywalking-logger', ngx.HTTP_PUT, metadata)
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ -- Create route
+ local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT,
data[1].input)
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ ngx.sleep(2)
+ }
+}
+--- error_log
+max pending entries limit exceeded. discarding entry
+--- timeout: 5
diff --git a/t/plugin/splunk-hec-logging2.t b/t/plugin/splunk-hec-logging2.t
new file mode 100644
index 000000000..d9d4a4018
--- /dev/null
+++ b/t/plugin/splunk-hec-logging2.t
@@ -0,0 +1,119 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ if (!defined $block->request) {
+ $block->set_value("request", "GET /t");
+ }
+
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: should drop entries when max_pending_entries is exceeded
+--- extra_yaml_config
+plugins:
+ - splunk-hec-logging
+--- config
+location /t {
+ content_by_lua_block {
+ local http = require "resty.http"
+ local httpc = http.new()
+ local data = {
+ {
+ input = {
+ plugins = {
+ ["splunk-hec-logging"] = {
+ endpoint = {
+ uri =
"http://127.0.0.1:1234/services/collector",
+ token = "BD274822-96AA-4DA6-90EC-18940FB2414C"
+ },
+ batch_max_size = 1,
+ timeout = 1,
+ max_retry_count = 10
+ }
+ },
+ upstream = {
+ nodes = {
+ ["127.0.0.1:1980"] = 1
+ },
+ type = "roundrobin"
+ },
+ uri = "/hello",
+ },
+ },
+ }
+
+ local t = require("lib.test_admin").test
+
+ -- Set plugin metadata
+ local metadata = {
+ log_format = {
+ host = "$host",
+ ["@timestamp"] = "$time_iso8601",
+ client_ip = "$remote_addr"
+ },
+ max_pending_entries = 1
+ }
+
+ local code, body =
t('/apisix/admin/plugin_metadata/splunk-hec-logging', ngx.HTTP_PUT, metadata)
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ -- Create route
+ local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT,
data[1].input)
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ httpc:request_uri(uri, {
+ method = "GET",
+ keepalive_timeout = 1,
+ keepalive_pool = 1,
+ })
+ ngx.sleep(2)
+ }
+}
+--- error_log
+max pending entries limit exceeded. discarding entry
+--- timeout: 5