This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a66fc52b chore(kafka-logger): support configuration 
`meta_refresh_interval` parameter (#8762)
1a66fc52b is described below

commit 1a66fc52b3a9051feb0d30f885a6f05be56bd632
Author: JunXu Chen <[email protected]>
AuthorDate: Mon Feb 6 10:00:23 2023 +0800

    chore(kafka-logger): support configuration `meta_refresh_interval` 
parameter (#8762)
    
    Fixes https://github.com/apache/apisix/issues/6033
---
 apisix/plugins/kafka-logger.lua        |  2 ++
 docs/en/latest/plugins/kafka-logger.md |  1 +
 docs/zh/latest/plugins/kafka-logger.md |  1 +
 t/plugin/kafka-logger2.t               | 58 ++++++++++++++++++++++++++++++++++
 4 files changed, 62 insertions(+)

diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index 0b22d92e5..a4bfcda1b 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -122,6 +122,7 @@ local schema = {
         producer_batch_size = {type = "integer", minimum = 0, default = 
1048576},
         producer_max_buffering = {type = "integer", minimum = 1, default = 
50000},
         producer_time_linger = {type = "integer", minimum = 1, default = 1},
+        meta_refresh_interval = {type = "integer", minimum = 1, default = 30},
     },
     oneOf = {
         { required = {"broker_list", "kafka_topic"},},
@@ -246,6 +247,7 @@ function _M.log(conf, ctx)
     broker_config["batch_size"] = conf.producer_batch_size
     broker_config["max_buffering"] = conf.producer_max_buffering
     broker_config["flush_time"] = conf.producer_time_linger * 1000
+    broker_config["refresh_interval"] = conf.meta_refresh_interval * 1000
 
     local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, 
create_producer,
                                                broker_list, broker_config, 
conf.cluster_name)
diff --git a/docs/en/latest/plugins/kafka-logger.md 
b/docs/en/latest/plugins/kafka-logger.md
index 2f49108a7..24ec21de5 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -61,6 +61,7 @@ It might take some time to receive the log data. It will be 
automatically sent a
 | producer_batch_size    | integer | optional    | 1048576        | [0,...]    
           | `batch_size` parameter in 
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in bytes.      
                                                                                
                                                                                
                                                                                
                                                       [...]
 | producer_max_buffering | integer | optional    | 50000          | [1,...]    
           | `max_buffering` parameter in 
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) representing 
maximum buffer size. Unit is message count.                                     
                                                                                
                                                                                
                                                      [...]
 | producer_time_linger   | integer | optional    | 1              | [1,...]    
           | `flush_time` parameter in 
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in seconds.    
                                                                                
                                                                                
                                                                                
                                                       [...]
+| meta_refresh_interval | integer | optional    | 30              | [1,...]    
           | `refresh_interval` parameter in 
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) specifies the 
time to auto refresh the metadata, in seconds.                                  
                                                                                
                                                                                
                                                  [...]
 
 This Plugin supports using batch processors to aggregate and process entries 
(logs/data) in a batch. This avoids the need for frequently submitting the 
data. The batch processor submits data every `5` seconds or when the data in 
the queue reaches `1000`. See [Batch 
Processor](../batch-processor.md#configuration) for more information or setting 
your custom configuration.
 
diff --git a/docs/zh/latest/plugins/kafka-logger.md 
b/docs/zh/latest/plugins/kafka-logger.md
index 5a6fd987b..ea68923d7 100644
--- a/docs/zh/latest/plugins/kafka-logger.md
+++ b/docs/zh/latest/plugins/kafka-logger.md
@@ -59,6 +59,7 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作
 | producer_batch_size    | integer | 否     | 1048576        | [0,...]          
     | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 
`batch_size` 参数,单位为字节。 |
 | producer_max_buffering | integer | 否     | 50000          | [1,...]          
     | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 
`max_buffering` 参数,表示最大缓冲区,单位为条。 |
 | producer_time_linger   | integer | 否     | 1              | [1,...]          
     | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 
`flush_time` 参数,单位为秒。|
+| meta_refresh_interval | integer  | 否     | 30             | [1,...]          
     | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 
`refresh_interval` 参数,用于指定自动刷新 metadata 的间隔时长,单位为秒。 |
 
 该插件支持使用批处理器来聚合并批量处理条目(日志/数据)。这样可以避免插件频繁地提交数据,默认设置情况下批处理器会每 `5` 秒钟或队列中的数据达到 
`1000` 条时提交数据,如需了解批处理器相关参数设置,请参考 [Batch-Processor](../batch-processor.md#配置) 
配置部分。
 
diff --git a/t/plugin/kafka-logger2.t b/t/plugin/kafka-logger2.t
index 5c4e0169d..4aabf8756 100644
--- a/t/plugin/kafka-logger2.t
+++ b/t/plugin/kafka-logger2.t
@@ -895,3 +895,61 @@ hello world
 --- error_log eval
 qr/send data to kafka: \{.*"body":"abcdef"/
 --- wait: 2
+
+
+
+=== TEST 22: setup route with meta_refresh_interval
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [=[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "brokers" :
+                                  [{
+                                    "host":"127.0.0.1",
+                                    "port": 9092
+                                  }],
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "meta_refresh_interval": 1,
+                                "batch_max_size": 1,
+                                "include_req_body": true
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]=]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+
+--- response_body
+passed
+
+
+
+=== TEST 23: hit route, send data to kafka successfully
+--- request
+POST /hello
+abcdef
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- error_log eval
+qr/send data to kafka: \{.*"body":"abcdef"/
+--- wait: 2

Reply via email to