This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new d3d4778 refactor(kafka-logger): identify the broker ahead of time
(#4863)
d3d4778 is described below
commit d3d477861069e53d359d47c6889189fbc2455618
Author: okaybase <[email protected]>
AuthorDate: Thu Aug 26 13:38:51 2021 +0800
refactor(kafka-logger): identify the broker ahead of time (#4863)
---
apisix/plugins/kafka-logger.lua | 28 +++++++++--------
t/plugin/kafka-logger.t | 67 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 82 insertions(+), 13 deletions(-)
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index 763e66c..69495ed 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -43,7 +43,16 @@ local schema = {
enum = {"default", "origin"},
},
broker_list = {
- type = "object"
+ type = "object",
+ minProperties = 1,
+ patternProperties = {
+ [".*"] = {
+ description = "the port of kafka broker",
+ type = "integer",
+ minimum = 1,
+ maximum = 65535,
+ },
+ },
},
kafka_topic = {type = "string"},
producer_type = {
@@ -148,10 +157,6 @@ end
local function send_kafka_data(conf, log_message, prod)
- if core.table.nkeys(conf.broker_list) == 0 then
- core.log.error("failed to identify the broker specified")
- end
-
local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
core.log.info("partition_id: ",
core.log.delay_exec(get_partition_id,
@@ -202,14 +207,11 @@ function _M.log(conf, ctx)
local broker_config = {}
for host, port in pairs(conf.broker_list) do
- if type(host) == 'string'
- and type(port) == 'number' then
- local broker = {
- host = host,
- port = port
- }
- core.table.insert(broker_list, broker)
- end
+ local broker = {
+ host = host,
+ port = port
+ }
+ core.table.insert(broker_list, broker)
end
broker_config["request_timeout"] = conf.timeout * 1000
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index d903615..34f61b7 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -999,3 +999,70 @@ GET /t
passed
--- error_log eval
qr/not found topic, retryable: true, topic: undefined_topic, partition_id: -1/
+
+
+
+=== TEST 24: check broker_list via schema
+--- config
+ location /t {
+ content_by_lua_block {
+ local data = {
+ {
+ input = {
+ broker_list = {},
+ kafka_topic = "test",
+ key= "key1",
+ },
+ },
+ {
+ input = {
+ broker_list = {
+ ["127.0.0.1"] = "9092"
+ },
+ kafka_topic = "test",
+ key= "key1",
+ },
+ },
+ {
+ input = {
+ broker_list = {
+ ["127.0.0.1"] = 0
+ },
+ kafka_topic = "test",
+ key= "key1",
+ },
+ },
+ {
+ input = {
+ broker_list = {
+ ["127.0.0.1"] = 65536
+ },
+ kafka_topic = "test",
+ key= "key1",
+ },
+ },
+ }
+
+ local plugin = require("apisix.plugins.kafka-logger")
+
+ local err_count = 0
+ for i in ipairs(data) do
+ local ok, err = plugin.check_schema(data[i].input)
+ if not ok then
+ err_count = err_count + 1
+ ngx.say(err)
+ end
+ end
+
+ assert(err_count == #data)
+ }
+ }
+--- request
+GET /t
+--- response_body
+property "broker_list" validation failed: expect object to have at least 1
properties
+property "broker_list" validation failed: failed to validate 127.0.0.1
(matching ".*"): wrong type: expected integer, got string
+property "broker_list" validation failed: failed to validate 127.0.0.1
(matching ".*"): expected 0 to be greater than 1
+property "broker_list" validation failed: failed to validate 127.0.0.1
(matching ".*"): expected 65536 to be smaller than 65535
+--- no_error_log
+[error]