This is an automated email from the ASF dual-hosted git repository.
nic-6443 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 86ac31202 fix(kafka-logger): support api_version so brokers can store
message timestamps (#13521)
86ac31202 is described below
commit 86ac3120282889da9870718a454b6c60b3ddf2f6
Author: Nic <[email protected]>
AuthorDate: Fri Jun 12 10:48:58 2026 +0800
fix(kafka-logger): support api_version so brokers can store message
timestamps (#13521)
---
apisix/plugins/kafka-logger.lua | 8 +++
docs/en/latest/plugins/kafka-logger.md | 1 +
docs/zh/latest/plugins/kafka-logger.md | 1 +
t/plugin/kafka-logger.t | 111 +++++++++++++++++++++++++++++++++
4 files changed, 121 insertions(+)
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index 318d21e92..014898c5e 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -128,6 +128,13 @@ local schema = {
producer_max_buffering = {type = "integer", minimum = 1, default =
50000},
producer_time_linger = {type = "integer", minimum = 1, default = 1},
meta_refresh_interval = {type = "integer", minimum = 1, default = 30},
+ -- send message with the Produce API version, only version 2 carries
+ -- the message timestamp, so that brokers can store it
+ api_version = {
+ type = "integer",
+ default = 1,
+ enum = {0, 1, 2},
+ },
},
oneOf = {
{ required = {"broker_list", "kafka_topic"},},
@@ -267,6 +274,7 @@ function _M.log(conf, ctx)
broker_config["max_buffering"] = conf.producer_max_buffering
broker_config["flush_time"] = conf.producer_time_linger * 1000
broker_config["refresh_interval"] = conf.meta_refresh_interval * 1000
+ broker_config["api_version"] = conf.api_version
local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil,
create_producer,
broker_list, broker_config,
conf.cluster_name)
diff --git a/docs/en/latest/plugins/kafka-logger.md
b/docs/en/latest/plugins/kafka-logger.md
index f7d208fa5..defd99397 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -69,6 +69,7 @@ It might take some time to receive the log data. It will be
automatically sent a
| producer_max_buffering | integer | False | 50000 |
[1,...] | `max_buffering` parameter
in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka)
representing maximum buffer size. Unit is message count.
|
| producer_time_linger | integer | False | 1 |
[1,...] | `flush_time` parameter in
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in seconds.
|
| meta_refresh_interval | integer | False | 30 |
[1,...] | `refresh_interval`
parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka)
that specifies the interval to auto-refresh the metadata, in seconds.
|
+| api_version | integer | False | 1 | [0,
1, 2] | `api_version` parameter in
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) that specifies
the version of the Kafka Produce API. Set to `2` to make brokers store message
timestamps; otherwise the timestamps are stored as `-1` (shown as `1970-01-01`
in some consumers). Requires Kafka 0.10 or later. |
This Plugin supports using batch processors to aggregate and process entries
(logs/data) in a batch. This avoids the need for frequently submitting the
data. The batch processor submits data every `5` seconds or when the data in
the queue reaches `1000`. See [Batch
Processor](../batch-processor.md#configuration) for more information or setting
your custom configuration.
diff --git a/docs/zh/latest/plugins/kafka-logger.md
b/docs/zh/latest/plugins/kafka-logger.md
index 3b3623356..b67fa3710 100644
--- a/docs/zh/latest/plugins/kafka-logger.md
+++ b/docs/zh/latest/plugins/kafka-logger.md
@@ -69,6 +69,7 @@ description: kafka-logger 插件将请求和响应日志作为 JSON 对象批量
| producer_max_buffering | integer | 否 | 50000 |
[1,...] | 对应
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`max_buffering` 参数,表示最大缓冲区大小,单位为条。
|
| producer_time_linger | integer | 否 | 1 |
[1,...] | 对应
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`flush_time` 参数,单位为秒。
|
| meta_refresh_interval | integer | 否 | 30 |
[1,...] | 对应
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`refresh_interval` 参数,用于指定自动刷新 metadata 的间隔时长,单位为秒。
|
+| api_version | integer | 否 | 1 | [0,
1, 2] | 对应
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的
`api_version` 参数,用于指定 Kafka Produce API 的版本。设置为 `2` 时 broker 才会存储消息的时间戳,否则时间戳为
`-1`(部分消费端会显示为 `1970-01-01`)。需要 Kafka 0.10 及以上版本。 |
该插件支持使用批处理器来聚合并批量处理条目(日志/数据),避免频繁提交数据。默认情况下,批处理器每 `5` 秒或队列中的数据达到 `1000`
条时提交数据。如需了解批处理器相关参数设置,请参考[批处理器](../batch-processor.md#配置)配置部分。
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index 7d8481917..6e355ba81 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -875,3 +875,114 @@ passed
qr/creating new batch processor with config.*/
--- grep_error_log_out eval
qr/creating new batch processor with config.*/
+
+
+
+=== TEST 28: check api_version schema: 2 is accepted, 3 is rejected
+--- config
+ location /t {
+ content_by_lua_block {
+ local plugin = require("apisix.plugins.kafka-logger")
+ local ok, err = plugin.check_schema({
+ broker_list = {
+ ["127.0.0.1"] = 9092
+ },
+ kafka_topic = "test",
+ api_version = 2
+ })
+ if not ok then
+ ngx.say(err)
+ end
+
+ local ok, err = plugin.check_schema({
+ broker_list = {
+ ["127.0.0.1"] = 9092
+ },
+ kafka_topic = "test",
+ api_version = 3
+ })
+ if not ok then
+ ngx.say(err)
+ end
+ ngx.say("done")
+ }
+ }
+--- response_body
+property "api_version" validation failed: matches none of the enum values
+done
+
+
+
+=== TEST 29: report log to kafka with api_version = 2, the broker should store
the message timestamp
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" : {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "producer_type": "sync",
+ "timeout" : 1,
+ "batch_max_size": 1,
+ "api_version": 2
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ ngx.say(body)
+ return
+ end
+ ngx.sleep(0.5)
+
+ -- record the current end offset of the topic before sending the
log
+ local bconsumer = require("resty.kafka.basic-consumer")
+ local pconsumer = require("resty.kafka.protocol.consumer")
+ local broker_list = {{host = "127.0.0.1", port = 9092}}
+ local consumer = bconsumer:new(broker_list, {})
+ local offset, err = consumer:list_offset("test2", 0,
+
pconsumer.LIST_OFFSET_TIMESTAMP_LAST)
+ if not offset then
+ ngx.say("failed to list offset: ", err)
+ return
+ end
+ offset = tonumber(tostring(offset):match("^%-?%d+"))
+
+ -- hit the route to send the log to kafka
+ t('/hello', ngx.HTTP_GET)
+ ngx.sleep(2)
+
+ local data, err = consumer:fetch("test2", 0, offset)
+ if not data then
+ ngx.say("failed to fetch message: ", err)
+ return
+ end
+ local message = data.records[1]
+ if not message then
+ ngx.say("no message fetched")
+ return
+ end
+ if tonumber(message.timestamp) > 0 then
+ ngx.say("message timestamp is stored")
+ else
+ ngx.say("invalid message timestamp: ",
tostring(message.timestamp))
+ end
+ }
+ }
+--- timeout: 10
+--- response_body
+message timestamp is stored