This is an automated email from the ASF dual-hosted git repository.

wenming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new ea691046e feat(kafka-logger): add max req/resp body size attributes 
(#11133)
ea691046e is described below

commit ea691046ecf9790695068387ae3ef7732355e098
Author: Abhishek Choudhary <[email protected]>
AuthorDate: Tue Apr 16 08:39:01 2024 +0545

    feat(kafka-logger): add max req/resp body size attributes (#11133)
---
 apisix/core/response.lua               |  16 +-
 apisix/plugins/kafka-logger.lua        |  30 ++
 apisix/utils/log-util.lua              |  67 ++-
 docs/en/latest/plugins/kafka-logger.md |   2 +
 t/plugin/kafka-logger-large-body.t     | 869 +++++++++++++++++++++++++++++++++
 5 files changed, 966 insertions(+), 18 deletions(-)

diff --git a/apisix/core/response.lua b/apisix/core/response.lua
index 04430abd5..baee97749 100644
--- a/apisix/core/response.lua
+++ b/apisix/core/response.lua
@@ -176,7 +176,7 @@ end
 --  final_body = transform(final_body)
 --  ngx.arg[1] = final_body
 --  ...
-function _M.hold_body_chunk(ctx, hold_the_copy)
+function _M.hold_body_chunk(ctx, hold_the_copy, max_resp_body_bytes)
     local body_buffer
     local chunk, eof = arg[1], arg[2]
 
@@ -192,22 +192,32 @@ function _M.hold_body_chunk(ctx, hold_the_copy)
                 n = 1
             }
             ctx._body_buffer[ctx._plugin_name] = body_buffer
+            ctx._resp_body_bytes = #chunk
         else
             local n = body_buffer.n + 1
             body_buffer.n = n
             body_buffer[n] = chunk
+            ctx._resp_body_bytes = ctx._resp_body_bytes + #chunk
+        end
+        if max_resp_body_bytes and ctx._resp_body_bytes >= max_resp_body_bytes 
then
+            local body_data = concat_tab(body_buffer, "", 1, body_buffer.n)
+            body_data = str_sub(body_data, 1, max_resp_body_bytes)
+            return body_data
         end
     end
 
     if eof then
         body_buffer = ctx._body_buffer[ctx._plugin_name]
         if not body_buffer then
+            if max_resp_body_bytes and #chunk >= max_resp_body_bytes then
+                chunk = str_sub(chunk, 1, max_resp_body_bytes)
+            end
             return chunk
         end
 
-        body_buffer = concat_tab(body_buffer, "", 1, body_buffer.n)
+        local body_data = concat_tab(body_buffer, "", 1, body_buffer.n)
         ctx._body_buffer[ctx._plugin_name] = nil
-        return body_buffer
+        return body_data
     end
 
     if not hold_the_copy then
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index ca004e9cc..adeec2921 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -14,6 +14,7 @@
 -- See the License for the specific language governing permissions and
 -- limitations under the License.
 --
+local expr     = require("resty.expr.v1")
 local core     = require("apisix.core")
 local log_util = require("apisix.utils.log-util")
 local producer = require ("resty.kafka.producer")
@@ -22,6 +23,7 @@ local bp_manager_mod = 
require("apisix.utils.batch-processor-manager")
 local math     = math
 local pairs    = pairs
 local type     = type
+local req_read_body = ngx.req.read_body
 local plugin_name = "kafka-logger"
 local batch_processor_manager = bp_manager_mod.new("kafka logger")
 
@@ -115,6 +117,8 @@ local schema = {
                 type = "array"
             }
         },
+        max_req_body_bytes = {type = "integer", minimum = 1, default = 524288},
+        max_resp_body_bytes = {type = "integer", minimum = 1, default = 
524288},
         -- in lua-resty-kafka, cluster_name is defined as number
         -- see https://github.com/doujiang24/lua-resty-kafka#new-1
         cluster_name = {type = "integer", minimum = 1, default = 1},
@@ -210,6 +214,32 @@ local function send_kafka_data(conf, log_message, prod)
 end
 
 
+function _M.access(conf, ctx)
+    if conf.include_req_body then
+        local should_read_body = true
+        if conf.include_req_body_expr then
+            if not conf.request_expr then
+                local request_expr, err = expr.new(conf.include_req_body_expr)
+                if not request_expr then
+                    core.log.error('generate request expr err ', err)
+                    return
+                end
+                conf.request_expr = request_expr
+            end
+
+            local result = conf.request_expr:eval(ctx.var)
+
+            if not result then
+                should_read_body = false
+            end
+        end
+        if should_read_body then
+            req_read_body()
+        end
+    end
+end
+
+
 function _M.body_filter(conf, ctx)
     log_util.collect_body(conf, ctx)
 end
diff --git a/apisix/utils/log-util.lua b/apisix/utils/log-util.lua
index a3ff834ee..e53daca80 100644
--- a/apisix/utils/log-util.lua
+++ b/apisix/utils/log-util.lua
@@ -24,10 +24,15 @@ local ngx_now = ngx.now
 local ngx_header = ngx.header
 local os_date = os.date
 local str_byte = string.byte
+local str_sub  = string.sub
 local math_floor = math.floor
 local ngx_update_time = ngx.update_time
 local req_get_body_data = ngx.req.get_body_data
 local is_http = ngx.config.subsystem == "http"
+local req_get_body_file = ngx.req.get_body_file
+local MAX_REQ_BODY      = 524288      -- 512 KiB
+local MAX_RESP_BODY     = 524288      -- 512 KiB
+local io                = io
 
 local lru_log_format = core.lrucache.new({
     ttl = 300, count = 512
@@ -36,6 +41,34 @@ local lru_log_format = core.lrucache.new({
 local _M = {}
 
 
+local function get_request_body(max_bytes)
+    local req_body = req_get_body_data()
+    if req_body then
+        if max_bytes and #req_body >= max_bytes then
+            req_body = str_sub(req_body, 1, max_bytes)
+        end
+        return req_body
+    end
+
+    local file_name = req_get_body_file()
+    if not file_name then
+        return nil
+    end
+
+    core.log.info("attempt to read body from file: ", file_name)
+
+    local f, err = io.open(file_name, 'r')
+    if not f then
+        return nil, "fail to open file " .. err
+    end
+
+    req_body = f:read(max_bytes)
+    f:close()
+
+    return req_body
+end
+
+
 local function gen_log_format(format)
     local log_format = {}
     for k, var_name in pairs(format) do
@@ -181,15 +214,13 @@ local function get_full_log(ngx, conf)
         end
 
         if log_request_body then
-            local body = req_get_body_data()
-            if body then
-                log.request.body = body
-            else
-                local body_file = ngx.req.get_body_file()
-                if body_file then
-                    log.request.body_file = body_file
-                end
+            local max_req_body_bytes = conf.max_req_body_bytes or MAX_REQ_BODY
+            local body, err = get_request_body(max_req_body_bytes)
+            if err then
+                core.log.error("fail to get request body: ", err)
+                return
             end
+            log.request.body = body
         end
     end
 
@@ -252,20 +283,21 @@ end
 
 
 function _M.get_req_original(ctx, conf)
-    local headers = {
+    local data = {
         ctx.var.request, "\r\n"
     }
     for k, v in pairs(ngx.req.get_headers()) do
-        core.table.insert_tail(headers, k, ": ", v, "\r\n")
+        core.table.insert_tail(data, k, ": ", v, "\r\n")
     end
-    -- core.log.error("headers: ", core.table.concat(headers, ""))
-    core.table.insert(headers, "\r\n")
+    core.table.insert(data, "\r\n")
 
     if conf.include_req_body then
-        core.table.insert(headers, ctx.var.request_body)
+        local max_req_body_bytes = conf.max_req_body_bytes or MAX_REQ_BODY
+        local req_body = get_request_body(max_req_body_bytes)
+        core.table.insert(data, req_body)
     end
 
-    return core.table.concat(headers, "")
+    return core.table.concat(data, "")
 end
 
 
@@ -310,7 +342,12 @@ function _M.collect_body(conf, ctx)
         end
 
         if log_response_body then
-            local final_body = core.response.hold_body_chunk(ctx, true)
+            local max_resp_body_bytes = conf.max_resp_body_bytes or 
MAX_RESP_BODY
+
+            if ctx._resp_body_bytes and ctx._resp_body_bytes >= 
max_resp_body_bytes then
+                return
+            end
+            local final_body = core.response.hold_body_chunk(ctx, true, 
max_resp_body_bytes)
             if not final_body then
                 return
             end
diff --git a/docs/en/latest/plugins/kafka-logger.md 
b/docs/en/latest/plugins/kafka-logger.md
index ecf087c17..fa70eb129 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -55,8 +55,10 @@ It might take some time to receive the log data. It will be 
automatically sent a
 | log_format | object | False    |   |               | Log format declared as 
key value pairs in JSON format. Values only support strings. 
[APISIX](../apisix-variable.md) or 
[Nginx](http://nginx.org/en/docs/varindex.html) variables can be used by 
prefixing the string with `$`. |
 | include_req_body       | boolean | False    | false          | [false, true] 
        | When set to `true` includes the request body in the log. If the 
request body is too big to be kept in the memory, it can't be logged due to 
Nginx's limitations.                                                            
                                                                                
                                     |
 | include_req_body_expr  | array   | False    |                |               
        | Filter for when the `include_req_body` attribute is set to `true`. 
Request body is only logged when the expression set here evaluates to `true`. 
See [lua-resty-expr](https://github.com/api7/lua-resty-expr) for more.          
                                                                                
                                |
+| max_req_body_bytes     | integer | False    | 524288         | >=1           
        | Request bodies within this size will be pushed to kafka, if the size 
exceeds the configured value it will be truncated before pushing to Kafka.      
                                                                                
                                                                                
                            |
 | include_resp_body      | boolean | False    | false          | [false, true] 
        | When set to `true` includes the response body in the log.             
                                                                                
                                                                                
                                                                                
                           |
 | include_resp_body_expr | array   | False    |                |               
        | Filter for when the `include_resp_body` attribute is set to `true`. 
Response body is only logged when the expression set here evaluates to `true`. 
See [lua-resty-expr](https://github.com/api7/lua-resty-expr) for more.          
                                                                                
                              |
+| max_resp_body_bytes    | integer | False    | 524288         | >=1           
        | Request bodies within this size will be pushed to kafka, if the size 
exceeds the configured value it will be truncated before pushing to Kafka.      
                                                                                
                                                                                
                            |
 | cluster_name           | integer | False    | 1              | [0,...]       
        | Name of the cluster. Used when there are two or more Kafka clusters. 
Only works if the `producer_type` attribute is set to `async`.                  
                                                                                
                                                                                
                            |
 | producer_batch_num     | integer | optional    | 200            | [1,...]    
           | `batch_num` parameter in 
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka). The merge 
message and batch is send to the server. Unit is message count.                 
                                                                                
                                                                                
                                                            [...]
 | producer_batch_size    | integer | optional    | 1048576        | [0,...]    
           | `batch_size` parameter in 
[lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in bytes.      
                                                                                
                                                                                
                                                                                
                                                       [...]
diff --git a/t/plugin/kafka-logger-large-body.t 
b/t/plugin/kafka-logger-large-body.t
new file mode 100644
index 000000000..e86c64591
--- /dev/null
+++ b/t/plugin/kafka-logger-large-body.t
@@ -0,0 +1,869 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!$block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    my $http_config = $block->http_config // <<_EOC_;
+    # fake server, only for test
+    server {
+        listen 1970;
+        location /large_resp {
+            content_by_lua_block {
+                local large_body = {
+                    "h", "e", "l", "l", "o"
+                }
+
+                local size_in_bytes = 1024 * 1024 -- 1mb
+                for i = 1, size_in_bytes do
+                    large_body[i+5] = "l"
+                end
+                large_body = table.concat(large_body, "")
+
+                ngx.say(large_body)
+            }
+        }
+    }
+_EOC_
+
+    $block->set_value("http_config", $http_config);
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: max_body_bytes is not an integer
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.kafka-logger")
+            local ok, err = plugin.check_schema({
+                broker_list= {
+                    ["127.0.0.1"] = 9092
+                },
+                kafka_topic = "test2",
+                key = "key1",
+                timeout = 1,
+                batch_max_size = 1,
+                max_req_body_bytes = "10",
+                include_req_body = true,
+                meta_format = "origin"
+            })
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- response_body
+property "max_req_body_bytes" validation failed: wrong type: expected integer, 
got string
+done
+
+
+
+=== TEST 2: set route(meta_format = origin, include_req_body = true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" : {
+                                    "127.0.0.1":9092
+                                },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "max_req_body_bytes": 5,
+                                "include_req_body": true,
+                                "meta_format": "origin"
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 3: hit route(meta_format = origin, include_req_body = true)
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- error_log
+send data to kafka: GET /hello?ab=cd HTTP/1.1
+host: localhost
+content-length: 6
+connection: close
+abcde
+--- wait: 2
+
+
+
+=== TEST 4: set route(meta_format = default, include_req_body = true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" : {
+                                    "127.0.0.1":9092
+                                },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "max_req_body_bytes": 5,
+                                "include_req_body": true
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 5: hit route(meta_format = default, include_req_body = true)
+--- request
+GET /hello?ab=cd
+abcdef
+--- response_body
+hello world
+--- error_log_like eval
+qr/"body": "abcde"/
+--- wait: 2
+
+
+
+=== TEST 6: set route(id: 1, meta_format = default, include_resp_body = true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [=[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" :
+                                  {
+                                    "127.0.0.1":9092
+                                  },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "max_resp_body_bytes": 5,
+                                "include_resp_body": true,
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]=]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+
+--- response_body
+passed
+
+
+
+=== TEST 7: hit route(meta_format = default, include_resp_body = true)
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log eval
+qr/send data to kafka: \{.*"body":"hello"/
+--- wait: 2
+
+
+
+=== TEST 8: set route(id: 1, meta_format = origin, include_resp_body = true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [=[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" :
+                                  {
+                                    "127.0.0.1":9092
+                                  },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "meta_format": "origin",
+                                "include_resp_body": true,
+                                "max_resp_body_bytes": 5,
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]=]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+
+--- response_body
+passed
+
+
+
+=== TEST 9: hit route(meta_format = origin, include_resp_body = true)
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log
+send data to kafka: POST /hello?name=qwerty HTTP/1.1
+host: localhost
+content-length: 6
+connection: close
+--- wait: 2
+
+
+
+=== TEST 10: set route(id: 1, meta_format = default, include_resp_body = true, 
include_req_body = true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [=[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" :
+                                  {
+                                    "127.0.0.1":9092
+                                  },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "meta_format": "default",
+                                "include_req_body": true,
+                                "max_req_body_bytes": 5,
+                                "include_resp_body": true,
+                                "max_resp_body_bytes": 5,
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]=]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+
+--- response_body
+passed
+
+
+
+=== TEST 11: hit route(meta_format = default, include_resp_body = true, 
include_req_body = true)
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log eval
+qr/send data to kafka: \{.*"body":"abcde"/
+--- error_log_like
+*"body":"hello"
+--- wait: 2
+
+
+
+=== TEST 12: set route(id: 1, meta_format = default, include_resp_body = 
false, include_req_body = false)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [=[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" :
+                                  {
+                                    "127.0.0.1":9092
+                                  },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "meta_format": "default",
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]=]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+
+--- response_body
+passed
+
+
+
+=== TEST 13: hit route(meta_format = default, include_resp_body = false, 
include_req_body = false)
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- no_error_log eval
+qr/send data to kafka: \{.*"body":.*/
+--- wait: 2
+
+
+
+=== TEST 14: set route(large_body, meta_format = default, include_resp_body = 
true, include_req_body = true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [=[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" :
+                                  {
+                                    "127.0.0.1":9092
+                                  },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "meta_format": "default",
+                                "include_req_body": true,
+                                "max_req_body_bytes": 256,
+                                "include_resp_body": true,
+                                "max_resp_body_bytes": 256,
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/echo"
+                }]=]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+
+--- response_body
+passed
+
+
+
+=== TEST 15: hit route(large_body, meta_format = default, include_resp_body = 
true, include_req_body = true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local core = require("apisix.core")
+            local t    = require("lib.test_admin")
+            local http = require("resty.http")
+
+            local large_body = {
+                "h", "e", "l", "l", "o"
+            }
+
+            local size_in_bytes = 10 * 1024 -- 10kb
+            for i = 1, size_in_bytes do
+                large_body[i+5] = "l"
+            end
+            large_body = table.concat(large_body, "")
+
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port .. "/echo"
+
+            local httpc = http.new()
+            local res, err = httpc:request_uri(uri,
+                {
+                    method = "POST",
+                    body = large_body,
+                }
+            )
+            ngx.say(res.body)
+        }
+    }
+--- request
+GET /t
+--- error_log eval
+qr/send data to kafka: \{.*"body":"hello(l{251})".*/
+--- response_body eval
+qr/hello.*/
+
+
+
+=== TEST 16: set route(large_body, meta_format = default, include_resp_body = 
true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [=[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" :
+                                  {
+                                    "127.0.0.1":9092
+                                  },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "meta_format": "default",
+                                "include_resp_body": true,
+                                "max_resp_body_bytes": 256,
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/echo"
+                }]=]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+
+--- response_body
+passed
+
+
+
+=== TEST 17: hit route(large_body, meta_format = default, include_resp_body = 
true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local core = require("apisix.core")
+            local t    = require("lib.test_admin")
+            local http = require("resty.http")
+
+            local large_body = {
+                "h", "e", "l", "l", "o"
+            }
+
+            local size_in_bytes = 10 * 1024 -- 10kb
+            for i = 1, size_in_bytes do
+                large_body[i+5] = "l"
+            end
+            large_body = table.concat(large_body, "")
+
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port .. "/echo"
+
+            local httpc = http.new()
+            local res, err = httpc:request_uri(uri,
+                {
+                    method = "POST",
+                    body = large_body,
+                }
+            )
+            ngx.say(res.body)
+        }
+    }
+--- request
+GET /t
+--- error_log eval
+qr/send data to kafka: \{.*"body":"hello(l{251})".*/
+--- response_body eval
+qr/hello.*/
+
+
+
+=== TEST 18: set route(large_body, meta_format = default, include_req_body = 
true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [=[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" :
+                                  {
+                                    "127.0.0.1":9092
+                                  },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "meta_format": "default",
+                                "include_req_body": true,
+                                "max_req_body_bytes": 256,
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/echo"
+                }]=]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+
+--- response_body
+passed
+
+
+
+=== TEST 19: hit route(large_body, meta_format = default, include_req_body = 
true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local core = require("apisix.core")
+            local t    = require("lib.test_admin")
+            local http = require("resty.http")
+
+            local large_body = {
+                "h", "e", "l", "l", "o"
+            }
+
+            local size_in_bytes = 10 * 1024 -- 10kb
+            for i = 1, size_in_bytes do
+                large_body[i+5] = "l"
+            end
+            large_body = table.concat(large_body, "")
+
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port .. "/echo"
+
+            local httpc = http.new()
+            local res, err = httpc:request_uri(uri,
+                {
+                    method = "POST",
+                    body = large_body,
+                }
+            )
+            ngx.say(res.body)
+        }
+    }
+--- request
+GET /t
+--- error_log eval
+qr/send data to kafka: \{.*"body":"hello(l{251})".*/
+--- response_body eval
+qr/hello.*/
+
+
+
+=== TEST 20: set route(large_body, meta_format = default, include_resp_body = 
true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [=[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" :
+                                  {
+                                    "127.0.0.1":9092
+                                  },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "meta_format": "default",
+                                "include_resp_body": true,
+                                "max_resp_body_bytes": 256,
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1970": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/large_resp"
+                }]=]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+
+--- response_body
+passed
+
+
+
+=== TEST 21: truncate upstream response body 1m to 256 bytes
+--- request
+GET /large_resp
+--- response_body eval
+qr/hello.*/
+--- error_log eval
+qr/send data to kafka: \{.*"body":"hello(l{251})".*/
+
+
+
+=== TEST 22: set route(large_body, meta_format = default, include_req_body = 
true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [=[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" :
+                                  {
+                                    "127.0.0.1":9092
+                                  },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "meta_format": "default",
+                                "include_req_body": true,
+                                "max_req_body_bytes": 256,
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]=]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+
+--- response_body
+passed
+
+
+
+=== TEST 23: truncate upstream request body 1m to 256 bytes
+--- config
+    location /t {
+        content_by_lua_block {
+            local core = require("apisix.core")
+            local t    = require("lib.test_admin")
+            local http = require("resty.http")
+
+            local large_body = {
+                "h", "e", "l", "l", "o"
+            }
+
+            local size_in_bytes = 100 * 1024 -- 10kb
+            for i = 1, size_in_bytes do
+                large_body[i+5] = "l"
+            end
+            large_body = table.concat(large_body, "")
+
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port .. "/hello"
+
+            local httpc = http.new()
+            local res, err = httpc:request_uri(uri,
+                {
+                    method = "POST",
+                    body = large_body,
+                }
+            )
+
+            if err then
+                ngx.say(err)
+            end
+
+            ngx.say(res.body)
+        }
+    }
+--- request
+GET /t
+--- response_body_like
+hello world
+--- error_log eval
+qr/send data to kafka: \{.*"body":"hello(l{251})".*/
+
+
+
+=== TEST 24: set route(meta_format = default, include_req_body = true)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" : {
+                                    "127.0.0.1":9092
+                                },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "max_req_body_bytes": 5,
+                                "include_req_body": true,
+                                "meta_format": "default"
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 25: empty request body
+--- request
+GET /hello?ab=cd
+--- response_body
+hello world
+--- error_log eval
+qr/send data to kafka/
+--- wait: 2


Reply via email to