This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c793ed  feat(kafka-logger): add required_acks option (#4878)
8c793ed is described below

commit 8c793ed7630f04becd894fc23196a445f11505cd
Author: okaybase <[email protected]>
AuthorDate: Wed Aug 25 16:59:21 2021 +0800

    feat(kafka-logger): add required_acks option (#4878)
---
 apisix/plugins/kafka-logger.lua        |   6 ++
 docs/en/latest/plugins/kafka-logger.md |   1 +
 docs/zh/latest/plugins/kafka-logger.md |   1 +
 t/plugin/kafka-logger.t                | 135 +++++++++++++++++++++++++++++++++
 4 files changed, 143 insertions(+)

diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index a7788a0..f03c55b 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -52,6 +52,11 @@ local schema = {
             default = "async",
             enum = {"async", "sync"},
         },
+        required_acks = {
+            type = "integer",
+            default = 1,
+            enum = { 0, 1, -1 },
+        },
         key = {type = "string"},
         timeout = {type = "integer", minimum = 1, default = 3},
         name = {type = "string", default = "kafka logger"},
@@ -207,6 +212,7 @@ function _M.log(conf, ctx)
 
     broker_config["request_timeout"] = conf.timeout * 1000
     broker_config["producer_type"] = conf.producer_type
+    broker_config["required_acks"] = conf.required_acks
 
     local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, 
create_producer,
                                                broker_list, broker_config)
diff --git a/docs/en/latest/plugins/kafka-logger.md 
b/docs/en/latest/plugins/kafka-logger.md
index b6d1583..4da9a23 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -48,6 +48,7 @@ For more info on Batch-Processor in Apache APISIX please 
refer.
 | broker_list      | object  | required    |                |         | An 
array of Kafka brokers.                                                         
      |
 | kafka_topic      | string  | required    |                |         | Target 
 topic to push data.                                                            
  |
 | producer_type    | string  | optional    | async          | ["async", 
"sync"]        | Producer's mode of sending messages.          |
+| required_acks          | integer | optional    | 1              | [0, 1, -1] 
| The number of acknowledgments the producer requires the leader to have 
received before considering a request complete. This controls the durability of 
records that are sent. Semantics is the same as kafka producer acks(If set 
`acks=0`  then the producer will not wait for any acknowledgment from the 
server at all. The record will be immediately added to the socket buffer and 
considered sent. `acks=1` This wil [...]
 | key              | string  | optional    |                |         | Used 
for partition allocation of messages.                                           
    |
 | timeout          | integer | optional    | 3              | [1,...] | 
Timeout for the upstream to send data.                                          
         |
 | name             | string  | optional    | "kafka logger" |         | A  
unique identifier to identity the batch processor.                              
       |
diff --git a/docs/zh/latest/plugins/kafka-logger.md 
b/docs/zh/latest/plugins/kafka-logger.md
index 21cec7b..05d6f46 100644
--- a/docs/zh/latest/plugins/kafka-logger.md
+++ b/docs/zh/latest/plugins/kafka-logger.md
@@ -46,6 +46,7 @@ title: kafka-logger
 | broker_list      | object  | 必须   |                |         | 要推送的 kafka 的 
broker 列表。                  |
 | kafka_topic      | string  | 必须   |                |         | 要推送的 topic。   
                              |
 | producer_type    | string  | 可选   | async          | ["async", "sync"]       
 | 生产者发送消息的模式。          |
+| required_acks          | integer | 可选    | 1              | [0, 1, -1] | 
生产者在确认一个请求发送完成之前需要收到的反馈信息的数量。这个参数是为了保证发送请求的可靠性。语义同 kafka 生产者的 acks 参数(如果设置 
`acks=0`,则 producer 不会等待服务器的反馈。该消息会被立刻添加到 socket buffer 中并认为已经发送完成。如果设置 
`acks=1`,leader 节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。如果设置 `acks=-1`,这就意味着 
leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。)。         |
 | key              | string  | 可选   |                |         | 用于消息的分区分配。    
                         |
 | timeout          | integer | 可选   | 3              | [1,...] | 发送数据的超时时间。    
                         |
 | name             | string  | 可选   | "kafka logger" |         | batch 
processor 的唯一标识。                     |
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index 46a520c..bf78ef2 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -722,3 +722,138 @@ GET /t
 [qr/partition_id: 1/,
 qr/partition_id: 0/,
 qr/partition_id: 2/]
+
+
+
+=== TEST 20: required_acks, matches none of the enum values
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.kafka-logger")
+            local ok, err = plugin.check_schema({
+                broker_list = {
+                    ["127.0.0.1"] = 3000
+                },
+                required_acks = 10,
+                kafka_topic ="test",
+                key= "key1"
+            })
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- request
+GET /t
+--- response_body
+property "required_acks" validation failed: matches none of the enum values
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 21: report log to kafka, with required_acks(1, 0, -1)
+--- config
+location /t {
+    content_by_lua_block {
+        local data = {
+            {
+                input = {
+                    plugins = {
+                        ["kafka-logger"] = {
+                            broker_list = {
+                                ["127.0.0.1"] = 9092
+                            },
+                            kafka_topic = "test2",
+                            producer_type = "sync",
+                            timeout = 1,
+                            batch_max_size = 1,
+                            required_acks = 1,
+                            meta_format = "origin",
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1980"] = 1
+                        },
+                        type = "roundrobin"
+                    },
+                    uri = "/hello",
+                },
+            },
+            {
+                input = {
+                    plugins = {
+                        ["kafka-logger"] = {
+                            broker_list = {
+                                ["127.0.0.1"] = 9092
+                            },
+                            kafka_topic = "test2",
+                            producer_type = "sync",
+                            timeout = 1,
+                            batch_max_size = 1,
+                            required_acks = -1,
+                            meta_format = "origin",
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1980"] = 1
+                        },
+                        type = "roundrobin"
+                    },
+                    uri = "/hello",
+                },
+            },
+            {
+                input = {
+                    plugins = {
+                        ["kafka-logger"] = {
+                            broker_list = {
+                                ["127.0.0.1"] = 9092
+                            },
+                            kafka_topic = "test2",
+                            producer_type = "sync",
+                            timeout = 1,
+                            batch_max_size = 1,
+                            required_acks = 0,
+                            meta_format = "origin",
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1980"] = 1
+                        },
+                        type = "roundrobin"
+                    },
+                    uri = "/hello",
+                },
+            },
+        }
+
+        local t = require("lib.test_admin").test
+        local err_count = 0
+        for i in ipairs(data) do
+            local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, 
data[i].input)
+
+            if code >= 300 then
+                err_count = err_count + 1
+            end
+            ngx.print(body)
+
+            t('/hello', ngx.HTTP_GET)
+        end
+
+        assert(err_count == 0)
+    }
+}
+--- request
+GET /t
+--- no_error_log
+[error]
+--- error_log
+send data to kafka: GET /hello
+send data to kafka: GET /hello
+send data to kafka: GET /hello

Reply via email to