This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new d3d4778  refactor(kafka-logger): identify the broker ahead of time 
(#4863)
d3d4778 is described below

commit d3d477861069e53d359d47c6889189fbc2455618
Author: okaybase <[email protected]>
AuthorDate: Thu Aug 26 13:38:51 2021 +0800

    refactor(kafka-logger): identify the broker ahead of time (#4863)
---
 apisix/plugins/kafka-logger.lua | 28 +++++++++--------
 t/plugin/kafka-logger.t         | 67 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 82 insertions(+), 13 deletions(-)

diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index 763e66c..69495ed 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -43,7 +43,16 @@ local schema = {
             enum = {"default", "origin"},
         },
         broker_list = {
-            type = "object"
+            type = "object",
+            minProperties = 1,
+            patternProperties = {
+                [".*"] = {
+                    description = "the port of kafka broker",
+                    type = "integer",
+                    minimum = 1,
+                    maximum = 65535,
+                },
+            },
         },
         kafka_topic = {type = "string"},
         producer_type = {
@@ -148,10 +157,6 @@ end
 
 
 local function send_kafka_data(conf, log_message, prod)
-    if core.table.nkeys(conf.broker_list) == 0 then
-        core.log.error("failed to identify the broker specified")
-    end
-
     local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
     core.log.info("partition_id: ",
                   core.log.delay_exec(get_partition_id,
@@ -202,14 +207,11 @@ function _M.log(conf, ctx)
     local broker_config = {}
 
     for host, port in pairs(conf.broker_list) do
-        if type(host) == 'string'
-                and type(port) == 'number' then
-            local broker = {
-                host = host,
-                port = port
-            }
-            core.table.insert(broker_list, broker)
-        end
+        local broker = {
+            host = host,
+            port = port
+        }
+        core.table.insert(broker_list, broker)
     end
 
     broker_config["request_timeout"] = conf.timeout * 1000
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index d903615..34f61b7 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -999,3 +999,70 @@ GET /t
 passed
 --- error_log eval
 qr/not found topic, retryable: true, topic: undefined_topic, partition_id: -1/
+
+
+
+=== TEST 24: check broker_list via schema
+--- config
+    location /t {
+        content_by_lua_block {
+            local data = {
+                {
+                    input = {
+                        broker_list = {},
+                        kafka_topic = "test",
+                        key= "key1",
+                    },
+                },
+                {
+                    input = {
+                        broker_list = {
+                            ["127.0.0.1"] = "9092"
+                        },
+                        kafka_topic = "test",
+                        key= "key1",
+                    },
+                },
+                {
+                    input = {
+                        broker_list = {
+                            ["127.0.0.1"] = 0
+                        },
+                        kafka_topic = "test",
+                        key= "key1",
+                    },
+                },
+                {
+                    input = {
+                        broker_list = {
+                            ["127.0.0.1"] = 65536
+                        },
+                        kafka_topic = "test",
+                        key= "key1",
+                    },
+                },
+            }
+
+            local plugin = require("apisix.plugins.kafka-logger")
+
+            local err_count = 0
+            for i in ipairs(data) do
+                local ok, err = plugin.check_schema(data[i].input)
+                if not ok then
+                    err_count = err_count + 1
+                    ngx.say(err)
+                end
+            end
+
+            assert(err_count == #data)
+        }
+    }
+--- request
+GET /t
+--- response_body
+property "broker_list" validation failed: expect object to have at least 1 
properties
+property "broker_list" validation failed: failed to validate 127.0.0.1 
(matching ".*"): wrong type: expected integer, got string
+property "broker_list" validation failed: failed to validate 127.0.0.1 
(matching ".*"): expected 0 to be greater than 1
+property "broker_list" validation failed: failed to validate 127.0.0.1 
(matching ".*"): expected 65536 to be smaller than 65535
+--- no_error_log
+[error]

Reply via email to