This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new bc091ad  test(kafka-logger): reduce duplicate sections (#5717)
bc091ad is described below

commit bc091ad433be7bf7b13fae3e9356e529657cb0be
Author: 罗泽轩 <spacewander...@gmail.com>
AuthorDate: Wed Dec 8 09:11:46 2021 +0800

    test(kafka-logger): reduce duplicate sections (#5717)
---
 t/plugin/kafka-logger.t  | 689 +----------------------------------------------
 t/plugin/kafka-logger2.t | 611 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 624 insertions(+), 676 deletions(-)

diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index 42277c6..0cc68fe 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -19,6 +19,19 @@ use t::APISIX 'no_plan';
 repeat_each(1);
 no_long_string();
 no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!$block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+});
+
 run_tests;
 
 __DATA__
@@ -41,12 +54,8 @@ __DATA__
             ngx.say("done")
         }
     }
---- request
-GET /t
 --- response_body
 done
---- no_error_log
-[error]
 
 
 
@@ -62,13 +71,9 @@ done
             ngx.say("done")
         }
     }
---- request
-GET /t
 --- response_body
 property "broker_list" is required
 done
---- no_error_log
-[error]
 
 
 
@@ -91,13 +96,9 @@ done
             ngx.say("done")
         }
     }
---- request
-GET /t
 --- response_body
 property "timeout" validation failed: wrong type: expected integer, got string
 done
---- no_error_log
-[error]
 
 
 
@@ -163,12 +164,8 @@ done
             ngx.say(body)
         }
     }
---- request
-GET /t
 --- response_body
 passed
---- no_error_log
-[error]
 
 
 
@@ -177,8 +174,6 @@ passed
 GET /hello
 --- response_body
 hello world
---- no_error_log
-[error]
 --- wait: 2
 
 
@@ -251,8 +246,6 @@ hello world
             local res, err = httpc:request_uri(uri, {method = "GET"})
         }
     }
---- request
-GET /t
 --- error_log
 failed to send data to Kafka topic
 [error]
@@ -296,12 +289,8 @@ failed to send data to Kafka topic
             ngx.say(body)
         }
     }
---- request
-GET /t
 --- response_body
 passed
---- no_error_log
-[error]
 
 
 
@@ -311,8 +300,6 @@ GET /hello?ab=cd
 abcdef
 --- response_body
 hello world
---- no_error_log
-[error]
 --- error_log
 send data to kafka: GET /hello?ab=cd HTTP/1.1
 host: localhost
@@ -360,12 +347,8 @@ abcdef
             ngx.say(body)
         }
     }
---- request
-GET /t
 --- response_body
 passed
---- no_error_log
-[error]
 
 
 
@@ -375,8 +358,6 @@ GET /hello?ab=cd
 abcdef
 --- response_body
 hello world
---- no_error_log
-[error]
 --- error_log
 send data to kafka: GET /hello?ab=cd HTTP/1.1
 host: localhost
@@ -421,12 +402,8 @@ connection: close
             ngx.say(body)
         }
     }
---- request
-GET /t
 --- response_body
 passed
---- no_error_log
-[error]
 
 
 
@@ -436,8 +413,6 @@ GET /hello?ab=cd
 abcdef
 --- response_body
 hello world
---- no_error_log
-[error]
 --- error_log_like eval
 qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/
 --- wait: 2
@@ -504,12 +479,8 @@ qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/
             ngx.say(body)
         }
     }
---- request
-GET /t
 --- response_body
 passed
---- no_error_log
-[error]
 
 
 
@@ -518,8 +489,6 @@ passed
 GET /hello
 --- response_body
 hello world
---- no_error_log
-[error]
 --- wait: 2
 
 
@@ -558,12 +527,8 @@ hello world
             ngx.say(body)
         }
     }
---- request
-GET /t
 --- response_body
 passed
---- no_error_log
-[error]
 
 
 
@@ -573,8 +538,6 @@ GET /hello?ab=cd
 abcdef
 --- response_body
 hello world
---- no_error_log
-[error]
 --- error_log_like eval
 qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/
 --- wait: 2
@@ -615,12 +578,8 @@ qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/
             ngx.say(body)
         }
     }
---- request
-GET /t
 --- response_body
 passed
---- no_error_log
-[error]
 
 
 
@@ -662,12 +621,8 @@ passed
             ngx.sleep(0.5)
         }
     }
---- request
-GET /t
 --- timeout: 5s
 --- ignore_response
---- no_error_log
-[error]
 --- error_log eval
 [qr/partition_id: 1/,
 qr/partition_id: 0/,
@@ -712,627 +667,9 @@ qr/partition_id: 2/]
             ngx.sleep(0.5)
         }
     }
---- request
-GET /t
 --- timeout: 5s
 --- ignore_response
---- no_error_log
-[error]
 --- error_log eval
 [qr/partition_id: 1/,
 qr/partition_id: 0/,
 qr/partition_id: 2/]
-
-
-
-=== TEST 20: required_acks, matches none of the enum values
---- config
-    location /t {
-        content_by_lua_block {
-            local plugin = require("apisix.plugins.kafka-logger")
-            local ok, err = plugin.check_schema({
-                broker_list = {
-                    ["127.0.0.1"] = 3000
-                },
-                required_acks = 10,
-                kafka_topic ="test",
-                key= "key1"
-            })
-            if not ok then
-                ngx.say(err)
-            end
-            ngx.say("done")
-        }
-    }
---- request
-GET /t
---- response_body
-property "required_acks" validation failed: matches none of the enum values
-done
---- no_error_log
-[error]
-
-
-
-=== TEST 21: report log to kafka, with required_acks(1, 0, -1)
---- config
-location /t {
-    content_by_lua_block {
-        local data = {
-            {
-                input = {
-                    plugins = {
-                        ["kafka-logger"] = {
-                            broker_list = {
-                                ["127.0.0.1"] = 9092
-                            },
-                            kafka_topic = "test2",
-                            producer_type = "sync",
-                            timeout = 1,
-                            batch_max_size = 1,
-                            required_acks = 1,
-                            meta_format = "origin",
-                        }
-                    },
-                    upstream = {
-                        nodes = {
-                            ["127.0.0.1:1980"] = 1
-                        },
-                        type = "roundrobin"
-                    },
-                    uri = "/hello",
-                },
-            },
-            {
-                input = {
-                    plugins = {
-                        ["kafka-logger"] = {
-                            broker_list = {
-                                ["127.0.0.1"] = 9092
-                            },
-                            kafka_topic = "test2",
-                            producer_type = "sync",
-                            timeout = 1,
-                            batch_max_size = 1,
-                            required_acks = -1,
-                            meta_format = "origin",
-                        }
-                    },
-                    upstream = {
-                        nodes = {
-                            ["127.0.0.1:1980"] = 1
-                        },
-                        type = "roundrobin"
-                    },
-                    uri = "/hello",
-                },
-            },
-            {
-                input = {
-                    plugins = {
-                        ["kafka-logger"] = {
-                            broker_list = {
-                                ["127.0.0.1"] = 9092
-                            },
-                            kafka_topic = "test2",
-                            producer_type = "sync",
-                            timeout = 1,
-                            batch_max_size = 1,
-                            required_acks = 0,
-                            meta_format = "origin",
-                        }
-                    },
-                    upstream = {
-                        nodes = {
-                            ["127.0.0.1:1980"] = 1
-                        },
-                        type = "roundrobin"
-                    },
-                    uri = "/hello",
-                },
-            },
-        }
-
-        local t = require("lib.test_admin").test
-        local err_count = 0
-        for i in ipairs(data) do
-            local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, 
data[i].input)
-
-            if code >= 300 then
-                err_count = err_count + 1
-            end
-            ngx.print(body)
-
-            t('/hello', ngx.HTTP_GET)
-        end
-
-        assert(err_count == 0)
-    }
-}
---- request
-GET /t
---- no_error_log
-[error]
---- error_log
-send data to kafka: GET /hello
-send data to kafka: GET /hello
-send data to kafka: GET /hello
-
-
-
-=== TEST 22: update the broker_list and cluster_name, generate different kafka 
producers
---- config
-    location /t {
-        content_by_lua_block {
-            local t = require("lib.test_admin").test
-            local code, body = t('/apisix/admin/routes/1',
-                 ngx.HTTP_PUT,
-                 [[{
-                    "upstream": {
-                        "nodes": {
-                            "127.0.0.1:1980": 1
-                        },
-                        "type": "roundrobin"
-                    },
-                    "uri": "/hello"
-                }]]
-            )
-            ngx.sleep(0.5)
-
-            if code >= 300 then
-                ngx.status = code
-                ngx.say("fail")
-                return
-            end
-
-            code, body = t('/apisix/admin/global_rules/1',
-                ngx.HTTP_PUT,
-                 [[{
-                    "plugins": {
-                        "kafka-logger": {
-                            "broker_list" : {
-                                "127.0.0.1": 9092
-                            },
-                            "kafka_topic" : "test2",
-                            "timeout" : 1,
-                            "batch_max_size": 1,
-                            "include_req_body": false,
-                            "cluster_name": 1
-                        }
-                    }
-                }]]
-            )
-
-            if code >= 300 then
-                ngx.status = code
-                ngx.say("fail")
-                return
-            end
-
-            t('/hello',ngx.HTTP_GET)
-            ngx.sleep(0.5)
-
-            code, body = t('/apisix/admin/global_rules/1',
-                ngx.HTTP_PUT,
-                 [[{
-                    "plugins": {
-                        "kafka-logger": {
-                            "broker_list" : {
-                                "127.0.0.1": 19092
-                            },
-                            "kafka_topic" : "test4",
-                            "timeout" : 1,
-                            "batch_max_size": 1,
-                            "include_req_body": false,
-                            "cluster_name": 2
-                        }
-                    }
-                }]]
-            )
-
-            if code >= 300 then
-                ngx.status = code
-                ngx.say("fail")
-                return
-            end
-
-            t('/hello',ngx.HTTP_GET)
-            ngx.sleep(0.5)
-
-            ngx.sleep(2)
-            ngx.say("passed")
-        }
-    }
---- request
-GET /t
---- timeout: 10
---- response
-passed
---- wait: 5
---- error_log
-phase_func(): kafka cluster name 1, broker_list[1] port 9092
-phase_func(): kafka cluster name 2, broker_list[1] port 19092
---- no_error_log eval
-qr/not found topic/
-
-
-
-=== TEST 23: use the topic that does not exist on kafka(even if kafka allows 
auto create topics, first time push messages to kafka would got this error)
---- config
-    location /t {
-        content_by_lua_block {
-            local t = require("lib.test_admin").test
-            local code, body = t('/apisix/admin/global_rules/1',
-                ngx.HTTP_PUT,
-                 [[{
-                    "plugins": {
-                        "kafka-logger": {
-                            "broker_list" : {
-                                "127.0.0.1": 9092
-                            },
-                            "kafka_topic" : "undefined_topic",
-                            "timeout" : 1,
-                            "batch_max_size": 1,
-                            "include_req_body": false
-                        }
-                    }
-                }]]
-            )
-
-            if code >= 300 then
-                ngx.status = code
-                ngx.say("fail")
-                return
-            end
-
-            t('/hello',ngx.HTTP_GET)
-            ngx.sleep(0.5)
-
-            ngx.sleep(2)
-            ngx.say("passed")
-        }
-    }
---- request
-GET /t
---- timeout: 5
---- response
-passed
---- error_log eval
-qr/not found topic, retryable: true, topic: undefined_topic, partition_id: -1/
-
-
-
-=== TEST 24: check broker_list via schema
---- config
-    location /t {
-        content_by_lua_block {
-            local data = {
-                {
-                    input = {
-                        broker_list = {},
-                        kafka_topic = "test",
-                        key= "key1",
-                    },
-                },
-                {
-                    input = {
-                        broker_list = {
-                            ["127.0.0.1"] = "9092"
-                        },
-                        kafka_topic = "test",
-                        key= "key1",
-                    },
-                },
-                {
-                    input = {
-                        broker_list = {
-                            ["127.0.0.1"] = 0
-                        },
-                        kafka_topic = "test",
-                        key= "key1",
-                    },
-                },
-                {
-                    input = {
-                        broker_list = {
-                            ["127.0.0.1"] = 65536
-                        },
-                        kafka_topic = "test",
-                        key= "key1",
-                    },
-                },
-            }
-
-            local plugin = require("apisix.plugins.kafka-logger")
-
-            local err_count = 0
-            for i in ipairs(data) do
-                local ok, err = plugin.check_schema(data[i].input)
-                if not ok then
-                    err_count = err_count + 1
-                    ngx.say(err)
-                end
-            end
-
-            assert(err_count == #data)
-        }
-    }
---- request
-GET /t
---- response_body
-property "broker_list" validation failed: expect object to have at least 1 
properties
-property "broker_list" validation failed: failed to validate 127.0.0.1 
(matching ".*"): wrong type: expected integer, got string
-property "broker_list" validation failed: failed to validate 127.0.0.1 
(matching ".*"): expected 0 to be greater than 1
-property "broker_list" validation failed: failed to validate 127.0.0.1 
(matching ".*"): expected 65536 to be smaller than 65535
---- no_error_log
-[error]
-
-
-
-=== TEST 25: kafka brokers info in log
---- config
-    location /t {
-        content_by_lua_block {
-            local t = require("lib.test_admin").test
-            local code, body = t('/apisix/admin/routes/1',
-                 ngx.HTTP_PUT,
-                 [[{
-                        "plugins": {
-                             "kafka-logger": {
-                                    "broker_list" :
-                                      {
-                                        "127.0.0.127":9092
-                                      },
-                                    "kafka_topic" : "test2",
-                                    "producer_type": "sync",
-                                    "key" : "key1",
-                                    "batch_max_size": 1,
-                                    "cluster_name": 10
-                             }
-                        },
-                        "upstream": {
-                            "nodes": {
-                                "127.0.0.1:1980": 1
-                            },
-                            "type": "roundrobin"
-                        },
-                        "uri": "/hello"
-                }]]
-            )
-            if code >= 300 then
-                ngx.status = code
-            end
-            ngx.say(body)
-            local http = require "resty.http"
-            local httpc = http.new()
-            local uri = "http://127.0.0.1:"; .. ngx.var.server_port .. "/hello"
-            local res, err = httpc:request_uri(uri, {method = "GET"})
-        }
-    }
---- request
-GET /t
---- error_log_like eval
-qr/create new kafka producer instance, brokers: 
\[\{"port":9092,"host":"127.0.0.127"}]/
-qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/
-
-
-
-=== TEST 26: set route(id: 1,include_req_body = true,include_req_body_expr = 
array)
---- config
-    location /t {
-        content_by_lua_block {
-            local t = require("lib.test_admin").test
-            local code, body = t('/apisix/admin/routes/1',
-                 ngx.HTTP_PUT,
-                 [=[{
-                        "plugins": {
-                            "kafka-logger": {
-                                "broker_list" :
-                                  {
-                                    "127.0.0.1":9092
-                                  },
-                                "kafka_topic" : "test2",
-                                "key" : "key1",
-                                "timeout" : 1,
-                                "include_req_body": true,
-                                "include_req_body_expr": [
-                                    [
-                                      "arg_name",
-                                      "==",
-                                      "qwerty"
-                                    ]
-                                ],
-                                "batch_max_size": 1
-                            }
-                        },
-                        "upstream": {
-                            "nodes": {
-                                "127.0.0.1:1980": 1
-                            },
-                            "type": "roundrobin"
-                        },
-                        "uri": "/hello"
-                }]=]
-                )
-            if code >= 300 then
-                ngx.status = code
-            end
-            ngx.say(body)
-        }
-    }
-
---- request
-GET /t
---- response_body
-passed
---- no_error_log
-[error]
-
-
-
-=== TEST 27: hit route, expr eval success
---- request
-POST /hello?name=qwerty
-abcdef
---- response_body
-hello world
---- no_error_log
-[error]
---- error_log eval
-qr/send data to kafka: \{.*"body":"abcdef"/
---- wait: 2
-
-
-
-=== TEST 28: hit route,expr eval fail
---- request
-POST /hello?name=zcxv
-abcdef
---- response_body
-hello world
---- no_error_log eval
-qr/send data to kafka: \{.*"body":"abcdef"/
---- wait: 2
-
-
-
-=== TEST 29: check log schema(include_req_body)
---- config
-    location /t {
-        content_by_lua_block {
-            local plugin = require("apisix.plugins.kafka-logger")
-            local ok, err = plugin.check_schema({
-                 kafka_topic = "test",
-                 key = "key1",
-                 broker_list = {
-                    ["127.0.0.1"] = 3
-                 },
-                 include_req_body = true,
-                 include_req_body_expr = {
-                     {"bar", "<>", "foo"}
-                 }
-            })
-            if not ok then
-                ngx.say(err)
-            end
-            ngx.say("done")
-        }
-    }
---- request
-GET /t
---- response_body
-failed to validate the 'include_req_body_expr' expression: invalid operator 
'<>'
-done
---- no_error_log
-[error]
-
-
-
-=== TEST 30: check log schema(include_resp_body)
---- config
-    location /t {
-        content_by_lua_block {
-            local plugin = require("apisix.plugins.kafka-logger")
-            local ok, err = plugin.check_schema({
-                 kafka_topic = "test",
-                 key = "key1",
-                 broker_list = {
-                    ["127.0.0.1"] = 3
-                 },
-                 include_resp_body = true,
-                 include_resp_body_expr = {
-                     {"bar", "<!>", "foo"}
-                 }
-            })
-            if not ok then
-                ngx.say(err)
-            end
-            ngx.say("done")
-        }
-    }
---- request
-GET /t
---- response_body
-failed to validate the 'include_resp_body_expr' expression: invalid operator 
'<!>'
-done
---- no_error_log
-[error]
-
-
-
-=== TEST 31: set route(id: 1,include_resp_body = true,include_resp_body_expr = 
array)
---- config
-    location /t {
-        content_by_lua_block {
-            local t = require("lib.test_admin").test
-            local code, body = t('/apisix/admin/routes/1',
-                 ngx.HTTP_PUT,
-                 [=[{
-                        "plugins": {
-                            "kafka-logger": {
-                                "broker_list" :
-                                  {
-                                    "127.0.0.1":9092
-                                  },
-                                "kafka_topic" : "test2",
-                                "key" : "key1",
-                                "timeout" : 1,
-                                "include_resp_body": true,
-                                "include_resp_body_expr": [
-                                    [
-                                      "arg_name",
-                                      "==",
-                                      "qwerty"
-                                    ]
-                                ],
-                                "batch_max_size": 1
-                            }
-                        },
-                        "upstream": {
-                            "nodes": {
-                                "127.0.0.1:1980": 1
-                            },
-                            "type": "roundrobin"
-                        },
-                        "uri": "/hello"
-                }]=]
-                )
-            if code >= 300 then
-                ngx.status = code
-            end
-            ngx.say(body)
-        }
-    }
-
---- request
-GET /t
---- response_body
-passed
---- no_error_log
-[error]
-
-
-
-=== TEST 32: hit route, expr eval success
---- request
-POST /hello?name=qwerty
-abcdef
---- response_body
-hello world
---- no_error_log
-[error]
---- error_log eval
-qr/send data to kafka: \{.*"body":"hello world\\n"/
---- wait: 2
-
-
-
-=== TEST 33: hit route,expr eval fail
---- request
-POST /hello?name=zcxv
-abcdef
---- response_body
-hello world
---- no_error_log eval
-qr/send data to kafka: \{.*"body":"hello world\\n"/
---- wait: 2
diff --git a/t/plugin/kafka-logger2.t b/t/plugin/kafka-logger2.t
new file mode 100644
index 0000000..73ffec5
--- /dev/null
+++ b/t/plugin/kafka-logger2.t
@@ -0,0 +1,611 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!$block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
+        $block->set_value("no_error_log", "[error]");
+    }
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: required_acks, matches none of the enum values
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.kafka-logger")
+            local ok, err = plugin.check_schema({
+                broker_list = {
+                    ["127.0.0.1"] = 3000
+                },
+                required_acks = 10,
+                kafka_topic ="test",
+                key= "key1"
+            })
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- response_body
+property "required_acks" validation failed: matches none of the enum values
+done
+
+
+
+=== TEST 2: report log to kafka, with required_acks(1, 0, -1)
+--- config
+location /t {
+    content_by_lua_block {
+        local data = {
+            {
+                input = {
+                    plugins = {
+                        ["kafka-logger"] = {
+                            broker_list = {
+                                ["127.0.0.1"] = 9092
+                            },
+                            kafka_topic = "test2",
+                            producer_type = "sync",
+                            timeout = 1,
+                            batch_max_size = 1,
+                            required_acks = 1,
+                            meta_format = "origin",
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1980"] = 1
+                        },
+                        type = "roundrobin"
+                    },
+                    uri = "/hello",
+                },
+            },
+            {
+                input = {
+                    plugins = {
+                        ["kafka-logger"] = {
+                            broker_list = {
+                                ["127.0.0.1"] = 9092
+                            },
+                            kafka_topic = "test2",
+                            producer_type = "sync",
+                            timeout = 1,
+                            batch_max_size = 1,
+                            required_acks = -1,
+                            meta_format = "origin",
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1980"] = 1
+                        },
+                        type = "roundrobin"
+                    },
+                    uri = "/hello",
+                },
+            },
+            {
+                input = {
+                    plugins = {
+                        ["kafka-logger"] = {
+                            broker_list = {
+                                ["127.0.0.1"] = 9092
+                            },
+                            kafka_topic = "test2",
+                            producer_type = "sync",
+                            timeout = 1,
+                            batch_max_size = 1,
+                            required_acks = 0,
+                            meta_format = "origin",
+                        }
+                    },
+                    upstream = {
+                        nodes = {
+                            ["127.0.0.1:1980"] = 1
+                        },
+                        type = "roundrobin"
+                    },
+                    uri = "/hello",
+                },
+            },
+        }
+
+        local t = require("lib.test_admin").test
+        local err_count = 0
+        for i in ipairs(data) do
+            local code, body = t('/apisix/admin/routes/1', ngx.HTTP_PUT, 
data[i].input)
+
+            if code >= 300 then
+                err_count = err_count + 1
+            end
+            ngx.print(body)
+
+            t('/hello', ngx.HTTP_GET)
+        end
+
+        assert(err_count == 0)
+    }
+}
+--- error_log
+send data to kafka: GET /hello
+send data to kafka: GET /hello
+send data to kafka: GET /hello
+
+
+
+=== TEST 3: update the broker_list and cluster_name, generate different kafka 
producers
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "uri": "/hello"
+                }]]
+            )
+            ngx.sleep(0.5)
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            code, body = t('/apisix/admin/global_rules/1',
+                ngx.HTTP_PUT,
+                 [[{
+                    "plugins": {
+                        "kafka-logger": {
+                            "broker_list" : {
+                                "127.0.0.1": 9092
+                            },
+                            "kafka_topic" : "test2",
+                            "timeout" : 1,
+                            "batch_max_size": 1,
+                            "include_req_body": false,
+                            "cluster_name": 1
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+
+            code, body = t('/apisix/admin/global_rules/1',
+                ngx.HTTP_PUT,
+                 [[{
+                    "plugins": {
+                        "kafka-logger": {
+                            "broker_list" : {
+                                "127.0.0.1": 19092
+                            },
+                            "kafka_topic" : "test4",
+                            "timeout" : 1,
+                            "batch_max_size": 1,
+                            "include_req_body": false,
+                            "cluster_name": 2
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+
+            ngx.sleep(2)
+            ngx.say("passed")
+        }
+    }
+--- timeout: 10
+--- response
+passed
+--- wait: 5
+--- error_log
+phase_func(): kafka cluster name 1, broker_list[1] port 9092
+phase_func(): kafka cluster name 2, broker_list[1] port 19092
+--- no_error_log eval
+qr/not found topic/
+
+
+
+=== TEST 4: use the topic that does not exist on kafka(even if kafka allows 
auto create topics, first time push messages to kafka would got this error)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/global_rules/1',
+                ngx.HTTP_PUT,
+                 [[{
+                    "plugins": {
+                        "kafka-logger": {
+                            "broker_list" : {
+                                "127.0.0.1": 9092
+                            },
+                            "kafka_topic" : "undefined_topic",
+                            "timeout" : 1,
+                            "batch_max_size": 1,
+                            "include_req_body": false
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+
+            ngx.sleep(2)
+            ngx.say("passed")
+        }
+    }
+--- timeout: 5
+--- response
+passed
+--- error_log eval
+qr/not found topic, retryable: true, topic: undefined_topic, partition_id: -1/
+
+
+
+=== TEST 5: check broker_list via schema
+--- config
+    location /t {
+        content_by_lua_block {
+            local data = {
+                {
+                    input = {
+                        broker_list = {},
+                        kafka_topic = "test",
+                        key= "key1",
+                    },
+                },
+                {
+                    input = {
+                        broker_list = {
+                            ["127.0.0.1"] = "9092"
+                        },
+                        kafka_topic = "test",
+                        key= "key1",
+                    },
+                },
+                {
+                    input = {
+                        broker_list = {
+                            ["127.0.0.1"] = 0
+                        },
+                        kafka_topic = "test",
+                        key= "key1",
+                    },
+                },
+                {
+                    input = {
+                        broker_list = {
+                            ["127.0.0.1"] = 65536
+                        },
+                        kafka_topic = "test",
+                        key= "key1",
+                    },
+                },
+            }
+
+            local plugin = require("apisix.plugins.kafka-logger")
+
+            local err_count = 0
+            for i in ipairs(data) do
+                local ok, err = plugin.check_schema(data[i].input)
+                if not ok then
+                    err_count = err_count + 1
+                    ngx.say(err)
+                end
+            end
+
+            assert(err_count == #data)
+        }
+    }
+--- response_body
+property "broker_list" validation failed: expect object to have at least 1 
properties
+property "broker_list" validation failed: failed to validate 127.0.0.1 
(matching ".*"): wrong type: expected integer, got string
+property "broker_list" validation failed: failed to validate 127.0.0.1 
(matching ".*"): expected 0 to be greater than 1
+property "broker_list" validation failed: failed to validate 127.0.0.1 
(matching ".*"): expected 65536 to be smaller than 65535
+
+
+
+=== TEST 6: kafka brokers info in log
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                             "kafka-logger": {
+                                    "broker_list" :
+                                      {
+                                        "127.0.0.127":9092
+                                      },
+                                    "kafka_topic" : "test2",
+                                    "producer_type": "sync",
+                                    "key" : "key1",
+                                    "batch_max_size": 1,
+                                    "cluster_name": 10
+                             }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+            local http = require "resty.http"
+            local httpc = http.new()
+            local uri = "http://127.0.0.1:"; .. ngx.var.server_port .. "/hello"
+            local res, err = httpc:request_uri(uri, {method = "GET"})
+        }
+    }
+--- error_log_like eval
+qr/create new kafka producer instance, brokers: 
\[\{"port":9092,"host":"127.0.0.127"}]/
+qr/failed to send data to Kafka topic: .*, brokers: \{"127.0.0.127":9092}/
+
+
+
+=== TEST 7: set route(id: 1,include_req_body = true,include_req_body_expr = 
array)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [=[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" :
+                                  {
+                                    "127.0.0.1":9092
+                                  },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "include_req_body": true,
+                                "include_req_body_expr": [
+                                    [
+                                      "arg_name",
+                                      "==",
+                                      "qwerty"
+                                    ]
+                                ],
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]=]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+
+--- response_body
+passed
+
+
+
+=== TEST 8: hit route, expr eval success
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log eval
+qr/send data to kafka: \{.*"body":"abcdef"/
+--- wait: 2
+
+
+
+=== TEST 9: hit route,expr eval fail
+--- request
+POST /hello?name=zcxv
+abcdef
+--- response_body
+hello world
+--- no_error_log eval
+qr/send data to kafka: \{.*"body":"abcdef"/
+--- wait: 2
+
+
+
+=== TEST 10: check log schema(include_req_body)
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.kafka-logger")
+            local ok, err = plugin.check_schema({
+                 kafka_topic = "test",
+                 key = "key1",
+                 broker_list = {
+                    ["127.0.0.1"] = 3
+                 },
+                 include_req_body = true,
+                 include_req_body_expr = {
+                     {"bar", "<>", "foo"}
+                 }
+            })
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- response_body
+failed to validate the 'include_req_body_expr' expression: invalid operator 
'<>'
+done
+
+
+
+=== TEST 11: check log schema(include_resp_body)
+--- config
+    location /t {
+        content_by_lua_block {
+            local plugin = require("apisix.plugins.kafka-logger")
+            local ok, err = plugin.check_schema({
+                 kafka_topic = "test",
+                 key = "key1",
+                 broker_list = {
+                    ["127.0.0.1"] = 3
+                 },
+                 include_resp_body = true,
+                 include_resp_body_expr = {
+                     {"bar", "<!>", "foo"}
+                 }
+            })
+            if not ok then
+                ngx.say(err)
+            end
+            ngx.say("done")
+        }
+    }
+--- response_body
+failed to validate the 'include_resp_body_expr' expression: invalid operator 
'<!>'
+done
+
+
+
+=== TEST 12: set route(id: 1,include_resp_body = true,include_resp_body_expr = 
array)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [=[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" :
+                                  {
+                                    "127.0.0.1":9092
+                                  },
+                                "kafka_topic" : "test2",
+                                "key" : "key1",
+                                "timeout" : 1,
+                                "include_resp_body": true,
+                                "include_resp_body_expr": [
+                                    [
+                                      "arg_name",
+                                      "==",
+                                      "qwerty"
+                                    ]
+                                ],
+                                "batch_max_size": 1
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]=]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+
+--- response_body
+passed
+
+
+
+=== TEST 13: hit route, expr eval success
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log eval
+qr/send data to kafka: \{.*"body":"hello world\\n"/
+--- wait: 2
+
+
+
+=== TEST 14: hit route,expr eval fail
+--- request
+POST /hello?name=zcxv
+abcdef
+--- response_body
+hello world
+--- no_error_log eval
+qr/send data to kafka: \{.*"body":"hello world\\n"/
+--- wait: 2

Reply via email to