This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 1a66fc52b chore(kafka-logger): support configuration
`meta_refresh_interval` parameter (#8762)
1a66fc52b is described below
commit 1a66fc52b3a9051feb0d30f885a6f05be56bd632
Author: JunXu Chen <[email protected]>
AuthorDate: Mon Feb 6 10:00:23 2023 +0800
chore(kafka-logger): support configuration `meta_refresh_interval`
parameter (#8762)
Fixes https://github.com/apache/apisix/issues/6033
---
apisix/plugins/kafka-logger.lua | 2 ++
docs/en/latest/plugins/kafka-logger.md | 1 +
docs/zh/latest/plugins/kafka-logger.md | 1 +
t/plugin/kafka-logger2.t | 58 ++++++++++++++++++++++++++++++++++
4 files changed, 62 insertions(+)
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index 0b22d92e5..a4bfcda1b 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -122,6 +122,7 @@ local schema = {
producer_batch_size = {type = "integer", minimum = 0, default =
1048576},
producer_max_buffering = {type = "integer", minimum = 1, default =
50000},
producer_time_linger = {type = "integer", minimum = 1, default = 1},
+ meta_refresh_interval = {type = "integer", minimum = 1, default = 30},
},
oneOf = {
{ required = {"broker_list", "kafka_topic"},},
@@ -246,6 +247,7 @@ function _M.log(conf, ctx)
broker_config["batch_size"] = conf.producer_batch_size
broker_config["max_buffering"] = conf.producer_max_buffering
broker_config["flush_time"] = conf.producer_time_linger * 1000
+ broker_config["refresh_interval"] = conf.meta_refresh_interval * 1000
local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil,
create_producer,
broker_list, broker_config,
conf.cluster_name)
diff --git a/docs/en/latest/plugins/kafka-logger.md
b/docs/en/latest/plugins/kafka-logger.md
index 2f49108a7..24ec21de5 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -61,6 +61,7 @@ It might take some time to receive the log data. It will be
automatically sent a
| producer_batch_size | integer | optional | 1048576 | [0,...]
| `batch_size` parameter in
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in bytes.
[...]
| producer_max_buffering | integer | optional | 50000 | [1,...]
| `max_buffering` parameter in
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) representing
maximum buffer size. Unit is message count.
[...]
| producer_time_linger | integer | optional | 1 | [1,...]
| `flush_time` parameter in
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in seconds.
[...]
+| meta_refresh_interval | integer | optional | 30 | [1,...]
| `refresh_interval` parameter in
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) specifies the
time to auto refresh the metadata, in seconds.
[...]
This Plugin supports using batch processors to aggregate and process entries
(logs/data) in a batch. This avoids the need for frequently submitting the
data. The batch processor submits data every `5` seconds or when the data in
the queue reaches `1000`. See [Batch
Processor](../batch-processor.md#configuration) for more information or setting
your custom configuration.
diff --git a/docs/zh/latest/plugins/kafka-logger.md
b/docs/zh/latest/plugins/kafka-logger.md
index 5a6fd987b..ea68923d7 100644
--- a/docs/zh/latest/plugins/kafka-logger.md
+++ b/docs/zh/latest/plugins/kafka-logger.md
@@ -59,6 +59,7 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作
| producer_batch_size | integer | 否 | 1048576 | [0,...]
| 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`batch_size` 参数,单位为字节。 |
| producer_max_buffering | integer | 否 | 50000 | [1,...]
| 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`max_buffering` 参数,表示最大缓冲区,单位为条。 |
| producer_time_linger | integer | 否 | 1 | [1,...]
| 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`flush_time` 参数,单位为秒。|
+| meta_refresh_interval | integer | 否 | 30 | [1,...]
| 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`refresh_interval` 参数,用于指定自动刷新 metadata 的间隔时长,单位为秒。 |
该插件支持使用批处理器来聚合并批量处理条目(日志/数据)。这样可以避免插件频繁地提交数据,默认设置情况下批处理器会每 `5` 秒钟或队列中的数据达到
`1000` 条时提交数据,如需了解批处理器相关参数设置,请参考 [Batch-Processor](../batch-processor.md#配置)
配置部分。
diff --git a/t/plugin/kafka-logger2.t b/t/plugin/kafka-logger2.t
index 5c4e0169d..4aabf8756 100644
--- a/t/plugin/kafka-logger2.t
+++ b/t/plugin/kafka-logger2.t
@@ -895,3 +895,61 @@ hello world
--- error_log eval
qr/send data to kafka: \{.*"body":"abcdef"/
--- wait: 2
+
+
+
+=== TEST 22: setup route with meta_refresh_interval
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [=[{
+ "plugins": {
+ "kafka-logger": {
+ "brokers" :
+ [{
+ "host":"127.0.0.1",
+ "port": 9092
+ }],
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "meta_refresh_interval": 1,
+ "batch_max_size": 1,
+ "include_req_body": true
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]=]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+
+--- response_body
+passed
+
+
+
+=== TEST 23: hit route, send data to kafka successfully
+--- request
+POST /hello
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log eval
+qr/send data to kafka: \{.*"body":"abcdef"/
+--- wait: 2