This is an automated email from the ASF dual-hosted git repository.
wenming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new ea691046e feat(kafka-logger): add max req/resp body size attributes
(#11133)
ea691046e is described below
commit ea691046ecf9790695068387ae3ef7732355e098
Author: Abhishek Choudhary <[email protected]>
AuthorDate: Tue Apr 16 08:39:01 2024 +0545
feat(kafka-logger): add max req/resp body size attributes (#11133)
---
apisix/core/response.lua | 16 +-
apisix/plugins/kafka-logger.lua | 30 ++
apisix/utils/log-util.lua | 67 ++-
docs/en/latest/plugins/kafka-logger.md | 2 +
t/plugin/kafka-logger-large-body.t | 869 +++++++++++++++++++++++++++++++++
5 files changed, 966 insertions(+), 18 deletions(-)
diff --git a/apisix/core/response.lua b/apisix/core/response.lua
index 04430abd5..baee97749 100644
--- a/apisix/core/response.lua
+++ b/apisix/core/response.lua
@@ -176,7 +176,7 @@ end
-- final_body = transform(final_body)
-- ngx.arg[1] = final_body
-- ...
-function _M.hold_body_chunk(ctx, hold_the_copy)
+function _M.hold_body_chunk(ctx, hold_the_copy, max_resp_body_bytes)
local body_buffer
local chunk, eof = arg[1], arg[2]
@@ -192,22 +192,32 @@ function _M.hold_body_chunk(ctx, hold_the_copy)
n = 1
}
ctx._body_buffer[ctx._plugin_name] = body_buffer
+ ctx._resp_body_bytes = #chunk
else
local n = body_buffer.n + 1
body_buffer.n = n
body_buffer[n] = chunk
+ ctx._resp_body_bytes = ctx._resp_body_bytes + #chunk
+ end
+ if max_resp_body_bytes and ctx._resp_body_bytes >= max_resp_body_bytes
then
+ local body_data = concat_tab(body_buffer, "", 1, body_buffer.n)
+ body_data = str_sub(body_data, 1, max_resp_body_bytes)
+ return body_data
end
end
if eof then
body_buffer = ctx._body_buffer[ctx._plugin_name]
if not body_buffer then
+ if max_resp_body_bytes and #chunk >= max_resp_body_bytes then
+ chunk = str_sub(chunk, 1, max_resp_body_bytes)
+ end
return chunk
end
- body_buffer = concat_tab(body_buffer, "", 1, body_buffer.n)
+ local body_data = concat_tab(body_buffer, "", 1, body_buffer.n)
ctx._body_buffer[ctx._plugin_name] = nil
- return body_buffer
+ return body_data
end
if not hold_the_copy then
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index ca004e9cc..adeec2921 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -14,6 +14,7 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
+local expr = require("resty.expr.v1")
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
@@ -22,6 +23,7 @@ local bp_manager_mod =
require("apisix.utils.batch-processor-manager")
local math = math
local pairs = pairs
local type = type
+local req_read_body = ngx.req.read_body
local plugin_name = "kafka-logger"
local batch_processor_manager = bp_manager_mod.new("kafka logger")
@@ -115,6 +117,8 @@ local schema = {
type = "array"
}
},
+ max_req_body_bytes = {type = "integer", minimum = 1, default = 524288},
+ max_resp_body_bytes = {type = "integer", minimum = 1, default =
524288},
-- in lua-resty-kafka, cluster_name is defined as number
-- see https://github.com/doujiang24/lua-resty-kafka#new-1
cluster_name = {type = "integer", minimum = 1, default = 1},
@@ -210,6 +214,32 @@ local function send_kafka_data(conf, log_message, prod)
end
+function _M.access(conf, ctx)
+ if conf.include_req_body then
+ local should_read_body = true
+ if conf.include_req_body_expr then
+ if not conf.request_expr then
+ local request_expr, err = expr.new(conf.include_req_body_expr)
+ if not request_expr then
+ core.log.error('generate request expr err ', err)
+ return
+ end
+ conf.request_expr = request_expr
+ end
+
+ local result = conf.request_expr:eval(ctx.var)
+
+ if not result then
+ should_read_body = false
+ end
+ end
+ if should_read_body then
+ req_read_body()
+ end
+ end
+end
+
+
function _M.body_filter(conf, ctx)
log_util.collect_body(conf, ctx)
end
diff --git a/apisix/utils/log-util.lua b/apisix/utils/log-util.lua
index a3ff834ee..e53daca80 100644
--- a/apisix/utils/log-util.lua
+++ b/apisix/utils/log-util.lua
@@ -24,10 +24,15 @@ local ngx_now = ngx.now
local ngx_header = ngx.header
local os_date = os.date
local str_byte = string.byte
+local str_sub = string.sub
local math_floor = math.floor
local ngx_update_time = ngx.update_time
local req_get_body_data = ngx.req.get_body_data
local is_http = ngx.config.subsystem == "http"
+local req_get_body_file = ngx.req.get_body_file
+local MAX_REQ_BODY = 524288 -- 512 KiB
+local MAX_RESP_BODY = 524288 -- 512 KiB
+local io = io
local lru_log_format = core.lrucache.new({
ttl = 300, count = 512
@@ -36,6 +41,34 @@ local lru_log_format = core.lrucache.new({
local _M = {}
+local function get_request_body(max_bytes)
+ local req_body = req_get_body_data()
+ if req_body then
+ if max_bytes and #req_body >= max_bytes then
+ req_body = str_sub(req_body, 1, max_bytes)
+ end
+ return req_body
+ end
+
+ local file_name = req_get_body_file()
+ if not file_name then
+ return nil
+ end
+
+ core.log.info("attempt to read body from file: ", file_name)
+
+ local f, err = io.open(file_name, 'r')
+ if not f then
+ return nil, "fail to open file " .. err
+ end
+
+ req_body = f:read(max_bytes)
+ f:close()
+
+ return req_body
+end
+
+
local function gen_log_format(format)
local log_format = {}
for k, var_name in pairs(format) do
@@ -181,15 +214,13 @@ local function get_full_log(ngx, conf)
end
if log_request_body then
- local body = req_get_body_data()
- if body then
- log.request.body = body
- else
- local body_file = ngx.req.get_body_file()
- if body_file then
- log.request.body_file = body_file
- end
+ local max_req_body_bytes = conf.max_req_body_bytes or MAX_REQ_BODY
+ local body, err = get_request_body(max_req_body_bytes)
+ if err then
+ core.log.error("fail to get request body: ", err)
+ return
end
+ log.request.body = body
end
end
@@ -252,20 +283,21 @@ end
function _M.get_req_original(ctx, conf)
- local headers = {
+ local data = {
ctx.var.request, "\r\n"
}
for k, v in pairs(ngx.req.get_headers()) do
- core.table.insert_tail(headers, k, ": ", v, "\r\n")
+ core.table.insert_tail(data, k, ": ", v, "\r\n")
end
- -- core.log.error("headers: ", core.table.concat(headers, ""))
- core.table.insert(headers, "\r\n")
+ core.table.insert(data, "\r\n")
if conf.include_req_body then
- core.table.insert(headers, ctx.var.request_body)
+ local max_req_body_bytes = conf.max_req_body_bytes or MAX_REQ_BODY
+ local req_body = get_request_body(max_req_body_bytes)
+ core.table.insert(data, req_body)
end
- return core.table.concat(headers, "")
+ return core.table.concat(data, "")
end
@@ -310,7 +342,12 @@ function _M.collect_body(conf, ctx)
end
if log_response_body then
- local final_body = core.response.hold_body_chunk(ctx, true)
+ local max_resp_body_bytes = conf.max_resp_body_bytes or
MAX_RESP_BODY
+
+ if ctx._resp_body_bytes and ctx._resp_body_bytes >=
max_resp_body_bytes then
+ return
+ end
+ local final_body = core.response.hold_body_chunk(ctx, true,
max_resp_body_bytes)
if not final_body then
return
end
diff --git a/docs/en/latest/plugins/kafka-logger.md
b/docs/en/latest/plugins/kafka-logger.md
index ecf087c17..fa70eb129 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -55,8 +55,10 @@ It might take some time to receive the log data. It will be
automatically sent a
| log_format | object | False | | | Log format declared as
key value pairs in JSON format. Values only support strings.
[APISIX](../apisix-variable.md) or
[Nginx](http://nginx.org/en/docs/varindex.html) variables can be used by
prefixing the string with `$`. |
| include_req_body | boolean | False | false | [false, true]
| When set to `true` includes the request body in the log. If the
request body is too big to be kept in the memory, it can't be logged due to
Nginx's limitations.
|
| include_req_body_expr | array | False | |
| Filter for when the `include_req_body` attribute is set to `true`.
Request body is only logged when the expression set here evaluates to `true`.
See [lua-resty-expr](https://github.com/api7/lua-resty-expr) for more.
|
+| max_req_body_bytes | integer | False | 524288 | >=1
| Request bodies within this size will be pushed to kafka, if the size
exceeds the configured value it will be truncated before pushing to Kafka.
|
| include_resp_body | boolean | False | false | [false, true]
| When set to `true` includes the response body in the log.
|
| include_resp_body_expr | array | False | |
| Filter for when the `include_resp_body` attribute is set to `true`.
Response body is only logged when the expression set here evaluates to `true`.
See [lua-resty-expr](https://github.com/api7/lua-resty-expr) for more.
|
+| max_resp_body_bytes | integer | False | 524288 | >=1
| Request bodies within this size will be pushed to kafka, if the size
exceeds the configured value it will be truncated before pushing to Kafka.
|
| cluster_name | integer | False | 1 | [0,...]
| Name of the cluster. Used when there are two or more Kafka clusters.
Only works if the `producer_type` attribute is set to `async`.
|
| producer_batch_num | integer | optional | 200 | [1,...]
| `batch_num` parameter in
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka). The merge
message and batch is send to the server. Unit is message count.
[...]
| producer_batch_size | integer | optional | 1048576 | [0,...]
| `batch_size` parameter in
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in bytes.
[...]
diff --git a/t/plugin/kafka-logger-large-body.t
b/t/plugin/kafka-logger-large-body.t
new file mode 100644
index 000000000..e86c64591
--- /dev/null
+++ b/t/plugin/kafka-logger-large-body.t
@@ -0,0 +1,869 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ if (!$block->request) {
+ $block->set_value("request", "GET /t");
+ }
+
+ my $http_config = $block->http_config // <<_EOC_;
+ # fake server, only for test
+ server {
+ listen 1970;
+ location /large_resp {
+ content_by_lua_block {
+ local large_body = {
+ "h", "e", "l", "l", "o"
+ }
+
+ local size_in_bytes = 1024 * 1024 -- 1mb
+ for i = 1, size_in_bytes do
+ large_body[i+5] = "l"
+ end
+ large_body = table.concat(large_body, "")
+
+ ngx.say(large_body)
+ }
+ }
+ }
+_EOC_
+
+ $block->set_value("http_config", $http_config);
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: max_body_bytes is not an integer
+--- config
+ location /t {
+ content_by_lua_block {
+ local plugin = require("apisix.plugins.kafka-logger")
+ local ok, err = plugin.check_schema({
+ broker_list= {
+ ["127.0.0.1"] = 9092
+ },
+ kafka_topic = "test2",
+ key = "key1",
+ timeout = 1,
+ batch_max_size = 1,
+ max_req_body_bytes = "10",
+ include_req_body = true,
+ meta_format = "origin"
+ })
+ if not ok then
+ ngx.say(err)
+ end
+ ngx.say("done")
+ }
+ }
+--- response_body
+property "max_req_body_bytes" validation failed: wrong type: expected integer,
got string
+done
+
+
+
+=== TEST 2: set route(meta_format = origin, include_req_body = true)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" : {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "batch_max_size": 1,
+ "max_req_body_bytes": 5,
+ "include_req_body": true,
+ "meta_format": "origin"
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 3: hit route(meta_format = origin, include_req_body = true)
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- error_log
+send data to kafka: GET /hello?ab=cd HTTP/1.1
+host: localhost
+content-length: 6
+connection: close
+abcde
+--- wait: 2
+
+
+
+=== TEST 4: set route(meta_format = default, include_req_body = true)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" : {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "batch_max_size": 1,
+ "max_req_body_bytes": 5,
+ "include_req_body": true
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 5: hit route(meta_format = default, include_req_body = true)
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- error_log_like eval
+qr/"body": "abcde"/
+--- wait: 2
+
+
+
+=== TEST 6: set route(id: 1, meta_format = default, include_resp_body = true)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [=[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" :
+ {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "max_resp_body_bytes": 5,
+ "include_resp_body": true,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]=]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+
+--- response_body
+passed
+
+
+
+=== TEST 7: hit route(meta_format = default, include_resp_body = true)
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log eval
+qr/send data to kafka: \{.*"body":"hello"/
+--- wait: 2
+
+
+
+=== TEST 8: set route(id: 1, meta_format = origin, include_resp_body = true)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [=[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" :
+ {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "meta_format": "origin",
+ "include_resp_body": true,
+ "max_resp_body_bytes": 5,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]=]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+
+--- response_body
+passed
+
+
+
+=== TEST 9: hit route(meta_format = origin, include_resp_body = true)
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log
+send data to kafka: POST /hello?name=qwerty HTTP/1.1
+host: localhost
+content-length: 6
+connection: close
+--- wait: 2
+
+
+
+=== TEST 10: set route(id: 1, meta_format = default, include_resp_body = true,
include_req_body = true)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [=[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" :
+ {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "meta_format": "default",
+ "include_req_body": true,
+ "max_req_body_bytes": 5,
+ "include_resp_body": true,
+ "max_resp_body_bytes": 5,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]=]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+
+--- response_body
+passed
+
+
+
+=== TEST 11: hit route(meta_format = default, include_resp_body = true,
include_req_body = true)
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log eval
+qr/send data to kafka: \{.*"body":"abcde"/
+--- error_log_like
+*"body":"hello"
+--- wait: 2
+
+
+
+=== TEST 12: set route(id: 1, meta_format = default, include_resp_body =
false, include_req_body = false)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [=[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" :
+ {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "meta_format": "default",
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]=]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+
+--- response_body
+passed
+
+
+
+=== TEST 13: hit route(meta_format = default, include_resp_body = false,
include_req_body = false)
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- no_error_log eval
+qr/send data to kafka: \{.*"body":.*/
+--- wait: 2
+
+
+
+=== TEST 14: set route(large_body, meta_format = default, include_resp_body =
true, include_req_body = true)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [=[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" :
+ {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "meta_format": "default",
+ "include_req_body": true,
+ "max_req_body_bytes": 256,
+ "include_resp_body": true,
+ "max_resp_body_bytes": 256,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/echo"
+ }]=]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+
+--- response_body
+passed
+
+
+
+=== TEST 15: hit route(large_body, meta_format = default, include_resp_body =
true, include_req_body = true)
+--- config
+ location /t {
+ content_by_lua_block {
+ local core = require("apisix.core")
+ local t = require("lib.test_admin")
+ local http = require("resty.http")
+
+ local large_body = {
+ "h", "e", "l", "l", "o"
+ }
+
+ local size_in_bytes = 10 * 1024 -- 10kb
+ for i = 1, size_in_bytes do
+ large_body[i+5] = "l"
+ end
+ large_body = table.concat(large_body, "")
+
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/echo"
+
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri,
+ {
+ method = "POST",
+ body = large_body,
+ }
+ )
+ ngx.say(res.body)
+ }
+ }
+--- request
+GET /t
+--- error_log eval
+qr/send data to kafka: \{.*"body":"hello(l{251})".*/
+--- response_body eval
+qr/hello.*/
+
+
+
+=== TEST 16: set route(large_body, meta_format = default, include_resp_body =
true)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [=[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" :
+ {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "meta_format": "default",
+ "include_resp_body": true,
+ "max_resp_body_bytes": 256,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/echo"
+ }]=]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+
+--- response_body
+passed
+
+
+
+=== TEST 17: hit route(large_body, meta_format = default, include_resp_body =
true)
+--- config
+ location /t {
+ content_by_lua_block {
+ local core = require("apisix.core")
+ local t = require("lib.test_admin")
+ local http = require("resty.http")
+
+ local large_body = {
+ "h", "e", "l", "l", "o"
+ }
+
+ local size_in_bytes = 10 * 1024 -- 10kb
+ for i = 1, size_in_bytes do
+ large_body[i+5] = "l"
+ end
+ large_body = table.concat(large_body, "")
+
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/echo"
+
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri,
+ {
+ method = "POST",
+ body = large_body,
+ }
+ )
+ ngx.say(res.body)
+ }
+ }
+--- request
+GET /t
+--- error_log eval
+qr/send data to kafka: \{.*"body":"hello(l{251})".*/
+--- response_body eval
+qr/hello.*/
+
+
+
+=== TEST 18: set route(large_body, meta_format = default, include_req_body =
true)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [=[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" :
+ {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "meta_format": "default",
+ "include_req_body": true,
+ "max_req_body_bytes": 256,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/echo"
+ }]=]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+
+--- response_body
+passed
+
+
+
+=== TEST 19: hit route(large_body, meta_format = default, include_req_body =
true)
+--- config
+ location /t {
+ content_by_lua_block {
+ local core = require("apisix.core")
+ local t = require("lib.test_admin")
+ local http = require("resty.http")
+
+ local large_body = {
+ "h", "e", "l", "l", "o"
+ }
+
+ local size_in_bytes = 10 * 1024 -- 10kb
+ for i = 1, size_in_bytes do
+ large_body[i+5] = "l"
+ end
+ large_body = table.concat(large_body, "")
+
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/echo"
+
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri,
+ {
+ method = "POST",
+ body = large_body,
+ }
+ )
+ ngx.say(res.body)
+ }
+ }
+--- request
+GET /t
+--- error_log eval
+qr/send data to kafka: \{.*"body":"hello(l{251})".*/
+--- response_body eval
+qr/hello.*/
+
+
+
+=== TEST 20: set route(large_body, meta_format = default, include_resp_body =
true)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [=[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" :
+ {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "meta_format": "default",
+ "include_resp_body": true,
+ "max_resp_body_bytes": 256,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1970": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/large_resp"
+ }]=]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+
+--- response_body
+passed
+
+
+
+=== TEST 21: truncate upstream response body 1m to 256 bytes
+--- request
+GET /large_resp
+--- response_body eval
+qr/hello.*/
+--- error_log eval
+qr/send data to kafka: \{.*"body":"hello(l{251})".*/
+
+
+
+=== TEST 22: set route(large_body, meta_format = default, include_req_body =
true)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [=[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" :
+ {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "meta_format": "default",
+ "include_req_body": true,
+ "max_req_body_bytes": 256,
+ "batch_max_size": 1
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]=]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+
+--- response_body
+passed
+
+
+
+=== TEST 23: truncate upstream request body 1m to 256 bytes
+--- config
+ location /t {
+ content_by_lua_block {
+ local core = require("apisix.core")
+ local t = require("lib.test_admin")
+ local http = require("resty.http")
+
+ local large_body = {
+ "h", "e", "l", "l", "o"
+ }
+
+ local size_in_bytes = 100 * 1024 -- 10kb
+ for i = 1, size_in_bytes do
+ large_body[i+5] = "l"
+ end
+ large_body = table.concat(large_body, "")
+
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+
+ local httpc = http.new()
+ local res, err = httpc:request_uri(uri,
+ {
+ method = "POST",
+ body = large_body,
+ }
+ )
+
+ if err then
+ ngx.say(err)
+ end
+
+ ngx.say(res.body)
+ }
+ }
+--- request
+GET /t
+--- response_body_like
+hello world
+--- error_log eval
+qr/send data to kafka: \{.*"body":"hello(l{251})".*/
+
+
+
+=== TEST 24: set route(meta_format = default, include_req_body = true)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" : {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1",
+ "timeout" : 1,
+ "batch_max_size": 1,
+ "max_req_body_bytes": 5,
+ "include_req_body": true,
+ "meta_format": "default"
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 25: empty request body
+--- request
+GET /hello?ab=cd
+--- response_body
+hello world
+--- error_log eval
+qr/send data to kafka/
+--- wait: 2