This is an automated email from the ASF dual-hosted git repository.
wenming pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 8adafa3 feature: add Kafka Logger plugin. (#1312)
8adafa3 is described below
commit 8adafa3ee280a8c9c71daa3b11b7223389a42f02
Author: Ayeshmantha Perera <[email protected]>
AuthorDate: Wed Mar 25 01:26:45 2020 +0100
feature: add Kafka Logger plugin. (#1312)
---
.travis/linux_openresty_runner.sh | 8 ++
.travis/linux_tengine_runner.sh | 8 ++
conf/config.yaml | 1 +
doc/README.md | 5 +-
doc/README_CN.md | 2 +-
doc/plugins/kafka-logger-cn.md | 130 +++++++++++++++++++
doc/plugins/kafka-logger.md | 134 +++++++++++++++++++
lua/apisix/plugins/kafka-logger.lua | 104 +++++++++++++++
rockspec/apisix-master-0.rockspec | 1 +
t/admin/plugins.t | 2 +-
t/debug/debug-mode.t | 1 +
t/plugin/kafka-logger.t | 251 ++++++++++++++++++++++++++++++++++++
12 files changed, 643 insertions(+), 4 deletions(-)
diff --git a/.travis/linux_openresty_runner.sh
b/.travis/linux_openresty_runner.sh
index cff10b8..30ef044 100755
--- a/.travis/linux_openresty_runner.sh
+++ b/.travis/linux_openresty_runner.sh
@@ -34,6 +34,14 @@ before_install() {
sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit
1)
docker pull redis:3.0-alpine
docker run --rm -itd -p 6379:6379 --name apisix_redis redis:3.0-alpine
+ # spin up kafka cluster for tests (1 zookeper and 1 kafka instance)
+ docker pull bitnami/zookeeper:3.6.0
+ docker pull bitnami/kafka:latest
+ docker network create kafka-net --driver bridge
+ docker run --name zookeeper-server -d -p 2181:2181 --network kafka-net -e
ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
+ docker run --name kafka-server1 -d --network kafka-net -e
ALLOW_PLAINTEXT_LISTENER=yes -e
KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 -e
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
+ sleep 5
+ docker exec -it kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh
--create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions
1 --topic test2
}
do_install() {
diff --git a/.travis/linux_tengine_runner.sh b/.travis/linux_tengine_runner.sh
index 1a356f9..d14cec2 100755
--- a/.travis/linux_tengine_runner.sh
+++ b/.travis/linux_tengine_runner.sh
@@ -34,6 +34,14 @@ before_install() {
sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit
1)
docker pull redis:3.0-alpine
docker run --rm -itd -p 6379:6379 --name apisix_redis redis:3.0-alpine
+ # spin up kafka cluster for tests (1 zookeper and 1 kafka instance)
+ docker pull bitnami/zookeeper:3.6.0
+ docker pull bitnami/kafka:latest
+ docker network create kafka-net --driver bridge
+ docker run --name zookeeper-server -d -p 2181:2181 --network kafka-net -e
ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
+ docker run --name kafka-server1 -d --network kafka-net -e
ALLOW_PLAINTEXT_LISTENER=yes -e
KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 -e
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
+ sleep 5
+ docker exec -it kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh
--create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions
1 --topic test2
}
tengine_install() {
diff --git a/conf/config.yaml b/conf/config.yaml
index 65f79da..f02134b 100644
--- a/conf/config.yaml
+++ b/conf/config.yaml
@@ -143,6 +143,7 @@ plugins: # plugin list
- proxy-cache
- tcp-logger
- proxy-mirror
+ - kafka-logger
stream_plugins:
- mqtt-proxy
diff --git a/doc/README.md b/doc/README.md
index 47c79bc..ad83c5b 100644
--- a/doc/README.md
+++ b/doc/README.md
@@ -59,9 +59,10 @@ Plugins
* [response-rewrite](plugins/response-rewrite.md): Set customized response
status code, body and header to the client.
* [fault-injection](plugins/fault-injection.md): The specified response body,
response code, and response time can be returned, which provides processing
capabilities in different failure scenarios, such as service failure, service
overload, and high service delay.
* [proxy-cache](plugins/proxy-cache.md): Provides the ability to cache
upstream response data.
-* [tcp-logger](plugins/tcp-logger.md): Log requests to TCP servers
-* [udp-logger](plugins/udp-logger.md): Log requests to UDP servers
+* [tcp-logger](plugins/tcp-logger.md): Log requests to TCP servers.
+* [udp-logger](plugins/udp-logger.md): Log requests to UDP servers.
* [proxy-mirror](plugins/proxy-mirror.md): Provides the ability to mirror
client requests.
+* [kafka-logger](plugins/kafka-logger.md): Log requests to External Kafka
servers.
Deploy to the Cloud
=======
diff --git a/doc/README_CN.md b/doc/README_CN.md
index b2081d2..f063486 100644
--- a/doc/README_CN.md
+++ b/doc/README_CN.md
@@ -63,4 +63,4 @@ Reference document
* [proxy-mirror](plugins/proxy-mirror-cn.md):代理镜像插件提供镜像客户端请求的能力。
* [udp-logger](plugins/udp-logger.md): 将请求记录到UDP服务器
* [tcp-logger](plugins/tcp-logger.md): 将请求记录到TCP服务器
-
+* [kafka-logger](plugins/kafka-logger-cn.md): 将请求记录到外部Kafka服务器。
diff --git a/doc/plugins/kafka-logger-cn.md b/doc/plugins/kafka-logger-cn.md
new file mode 100644
index 0000000..5afce2d
--- /dev/null
+++ b/doc/plugins/kafka-logger-cn.md
@@ -0,0 +1,130 @@
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+# Summary
+- [**定义**](#name)
+- [**属性列表**](#attributes)
+- [**信息**](#info)
+- [**如何开启**](#how-to-enable)
+- [**测试插件**](#test-plugin)
+- [**禁用插件**](#disable-plugin)
+
+## 定义
+
+`kafka-logger` 是一个插件,可用作ngx_lua nginx模块的Kafka客户端驱动程序。
+
+这将提供将Log数据请求作为JSON对象发送到外部Kafka集群的功能。
+
+## 属性列表
+
+|属性名称 |必选项 |描述|
+|--------- |--------|-----------|
+| broker_list |必要的| 一系列的Kafka经纪人。|
+| kafka_topic |必要的| 定位主题以推送数据。|
+| timeout |可选的|上游发送数据超时。|
+| async |可选的|布尔值,用于控制是否执行异步推送。|
+| key |必要的|消息的密钥。|
+| max_retry |可选的|没有重试次数。|
+
+## 信息
+
+异步与同步数据推送之间的区别。
+
+1. 同步模型
+
+ 如果成功,则返回当前代理和分区的偏移量(** cdata:LL **)。
+ 如果发生错误,则返回“ nil”,并带有描述错误的字符串。
+
+2. 在异步模型中
+
+ 消息将首先写入缓冲区。
+ 当缓冲区超过`batch_num`时,它将发送到kafka服务器,
+ 或每个`flush_time`刷新缓冲区。
+
+ 如果成功,则返回“ true”。
+ 如果出现错误,则返回“ nil”,并带有描述错误的字符串(“缓冲区溢出”)。
+
+##### 样本经纪人名单
+
+此插件支持一次推送到多个经纪人。如以下示例所示,指定外部kafka服务器的代理,以使此功能生效。
+
+```json
+{
+ "127.0.0.1":9092,
+ "127.0.0.1":9093
+}
+```
+
+## 如何开启
+
+1. 这是有关如何为特定路由启用kafka-logger插件的示例。
+
+```shell
+curl http://127.0.0.1:9080/apisix/admin/consumers -H 'X-API-KEY:
edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
+{
+ "username": "foo",
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" :
+ {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1"
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+}'
+```
+
+## 测试插件
+
+* 成功:
+
+```shell
+$ curl -i http://127.0.0.1:9080/hello
+HTTP/1.1 200 OK
+...
+hello, world
+```
+
+## 禁用插件
+
+当您要禁用`kafka-logger`插件时,这很简单,您可以在插件配置中删除相应的json配置,无需重新启动服务,它将立即生效:
+
+```shell
+$ curl http://127.0.0.1:2379/apisix/admin/routes/1 -X PUT -d value='
+{
+ "methods": ["GET"],
+ "uri": "/hello",
+ "plugins": {},
+ "upstream": {
+ "type": "roundrobin",
+ "nodes": {
+ "127.0.0.1:1980": 1
+ }
+ }
+}'
+```
diff --git a/doc/plugins/kafka-logger.md b/doc/plugins/kafka-logger.md
new file mode 100644
index 0000000..7bdc533
--- /dev/null
+++ b/doc/plugins/kafka-logger.md
@@ -0,0 +1,134 @@
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+# Summary
+- [**Name**](#name)
+- [**Attributes**](#attributes)
+- [**Info**](#info)
+- [**How To Enable**](#how-to-enable)
+- [**Test Plugin**](#test-plugin)
+- [**Disable Plugin**](#disable-plugin)
+
+
+## Name
+
+`kafka-logger` is a plugin which works as a Kafka client driver for the
ngx_lua nginx module.
+
+This will provide the ability to send Log data requests as JSON objects to
external Kafka clusters.
+
+## Attributes
+
+|Name |Requirement |Description|
+|--------- |--------|-----------|
+| broker_list |required| An array of Kafka brokers.|
+| kafka_topic |required| Target topic to push data.|
+| timeout |optional|Timeout for the upstream to send data.|
+| async |optional|Boolean value to control whether to perform async push.|
+| key |required|Key for the message.|
+| max_retry |optional|No of retries|
+
+## Info
+
+Difference between async and the sync data push.
+
+1. In sync model
+
+ In case of success, returns the offset (** cdata: LL **) of the current
broker and partition.
+ In case of errors, returns `nil` with a string describing the error.
+
+2. In async model
+
+ The `message` will write to the buffer first.
+ It will send to the kafka server when the buffer exceed the `batch_num`,
+ or every `flush_time` flush the buffer.
+
+ In case of success, returns `true`.
+ In case of errors, returns `nil` with a string describing the error
(`buffer overflow`).
+
+##### Sample broker list
+
+This plugin supports to push in to more than one broker at a time. Specify the
brokers of the external kafka servers as below
+sample to take effect of this functionality.
+
+```json
+{
+ "127.0.0.1":9092,
+ "127.0.0.1":9093
+}
+```
+
+## How To Enable
+
+1. Here is an examle on how to enable kafka-logger plugin for a specific route.
+
+```shell
+curl http://127.0.0.1:9080/apisix/admin/consumers -H 'X-API-KEY:
edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
+{
+ "username": "foo",
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" :
+ {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1"
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+}'
+```
+
+## Test Plugin
+
+* success:
+
+```shell
+$ curl -i http://127.0.0.1:9080/hello
+HTTP/1.1 200 OK
+...
+hello, world
+```
+
+## Disable Plugin
+
+When you want to disable the `kafka-logger` plugin, it is very simple,
+ you can delete the corresponding json configuration in the plugin
configuration,
+ no need to restart the service, it will take effect immediately:
+
+```shell
+$ curl http://127.0.0.1:2379/apisix/admin/routes/1 -X PUT -d value='
+{
+ "methods": ["GET"],
+ "uri": "/hello",
+ "plugins": {},
+ "upstream": {
+ "type": "roundrobin",
+ "nodes": {
+ "127.0.0.1:1980": 1
+ }
+ }
+}'
+```
diff --git a/lua/apisix/plugins/kafka-logger.lua
b/lua/apisix/plugins/kafka-logger.lua
new file mode 100644
index 0000000..8f89af2
--- /dev/null
+++ b/lua/apisix/plugins/kafka-logger.lua
@@ -0,0 +1,104 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+local core = require("apisix.core")
+local log_util = require("apisix.utils.log-util")
+local producer = require ("resty.kafka.producer")
+local pairs = pairs
+local type = type
+local table = table
+
+local plugin_name = "kafka-logger"
+local ngx = ngx
+
+local timer_at = ngx.timer.at
+
+local schema = {
+ type = "object",
+ properties = {
+ broker_list = {
+ type = "object"
+ },
+ timeout = { -- timeout in milliseconds
+ type = "integer", minimum = 1, default= 2000
+ },
+ kafka_topic = {type = "string"},
+ async = {type = "boolean", default = false},
+ key = {type = "string"},
+ max_retry = {type = "integer", minimum = 0 , default = 3},
+ },
+ required = {"broker_list", "kafka_topic", "key"}
+}
+
+local _M = {
+ version = 0.1,
+ priority = 403,
+ name = plugin_name,
+ schema = schema,
+}
+
+function _M.check_schema(conf)
+ return core.schema.check(schema, conf)
+end
+
+local function log(premature, conf, log_message)
+ if premature then
+ return
+ end
+
+ if core.table.nkeys(conf.broker_list) == 0 then
+ core.log.error("failed to identify the broker specified")
+ end
+
+ local broker_list = {}
+ local broker_config = {}
+
+ for host, port in pairs(conf.broker_list) do
+ if type(host) == 'string'
+ and type(port) == 'number' then
+
+ local broker = {
+ host = host, port = port
+ }
+ table.insert(broker_list,broker)
+ end
+ end
+
+ broker_config["request_timeout"] = conf.timeout
+ broker_config["max_retry"] = conf.max_retry
+
+ --Async producers will queue logs and push them when the buffer exceeds.
+ if conf.async then
+ broker_config["producer_type"] = "async"
+ end
+
+ local prod, err = producer:new(broker_list,broker_config)
+ if err then
+ core.log.error("failed to identify the broker specified", err)
+ return
+ end
+
+ local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
+ if not ok then
+ core.log.error("failed to send data to Kafka topic", err)
+ end
+end
+
+function _M.log(conf)
+ return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx)))
+end
+
+return _M
diff --git a/rockspec/apisix-master-0.rockspec
b/rockspec/apisix-master-0.rockspec
index 7a34794..98424c6 100644
--- a/rockspec/apisix-master-0.rockspec
+++ b/rockspec/apisix-master-0.rockspec
@@ -48,6 +48,7 @@ dependencies = {
"lua-resty-prometheus = 1.0",
"jsonschema = 0.8",
"lua-resty-ipmatcher = 0.6",
+ "lua-resty-kafka = 0.07",
}
build = {
diff --git a/t/admin/plugins.t b/t/admin/plugins.t
index b8771c7..ee4e0ab 100644
--- a/t/admin/plugins.t
+++ b/t/admin/plugins.t
@@ -30,7 +30,7 @@ __DATA__
--- request
GET /apisix/admin/plugins/list
--- response_body_like eval
-qr/\["limit-req","limit-count","limit-conn","key-auth","basic-auth","prometheus","node-status","jwt-auth","zipkin","ip-restriction","grpc-transcode","serverless-pre-function","serverless-post-function","openid-connect","proxy-rewrite","redirect","response-rewrite","fault-injection","udp-logger","wolf-rbac","proxy-cache","tcp-logger","proxy-mirror"\]/
+qr/\["limit-req","limit-count","limit-conn","key-auth","basic-auth","prometheus","node-status","jwt-auth","zipkin","ip-restriction","grpc-transcode","serverless-pre-function","serverless-post-function","openid-connect","proxy-rewrite","redirect","response-rewrite","fault-injection","udp-logger","wolf-rbac","proxy-cache","tcp-logger","proxy-mirror","kafka-logger"\]/
--- no_error_log
[error]
diff --git a/t/debug/debug-mode.t b/t/debug/debug-mode.t
index d5a6c1d..fae5984 100644
--- a/t/debug/debug-mode.t
+++ b/t/debug/debug-mode.t
@@ -75,6 +75,7 @@ loaded plugin and sort by priority: 899 name: response-rewrite
loaded plugin and sort by priority: 506 name: grpc-transcode
loaded plugin and sort by priority: 500 name: prometheus
loaded plugin and sort by priority: 405 name: tcp-logger
+loaded plugin and sort by priority: 403 name: kafka-logger
loaded plugin and sort by priority: 400 name: udp-logger
loaded plugin and sort by priority: 0 name: example-plugin
loaded plugin and sort by priority: -1000 name: zipkin
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
new file mode 100644
index 0000000..e9344d6
--- /dev/null
+++ b/t/plugin/kafka-logger.t
@@ -0,0 +1,251 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+run_tests;
+
+__DATA__
+
+=== TEST 1: sanity
+--- config
+ location /t {
+ content_by_lua_block {
+ local plugin = require("apisix.plugins.kafka-logger")
+ local ok, err = plugin.check_schema({
+ kafka_topic = "test",
+ key = "key1",
+ broker_list = {
+ ["127.0.0.1"] = 3
+ }
+ })
+ if not ok then
+ ngx.say(err)
+ end
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 2: missing broker list
+--- config
+ location /t {
+ content_by_lua_block {
+ local plugin = require("apisix.plugins.kafka-logger")
+ local ok, err = plugin.check_schema({kafka_topic = "test", key=
"key1"})
+ if not ok then
+ ngx.say(err)
+ end
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+property "broker_list" is required
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 3: wrong type of string
+--- config
+ location /t {
+ content_by_lua_block {
+ local plugin = require("apisix.plugins.kafka-logger")
+ local ok, err = plugin.check_schema({
+ broker_list = {
+ ["127.0.0.1"] = 3000
+ },
+ timeout = "10",
+ kafka_topic ="test",
+ key= "key1"
+ })
+ if not ok then
+ ngx.say(err)
+ end
+ ngx.say("done")
+ }
+ }
+--- request
+GET /t
+--- response_body
+property "timeout" validation failed: wrong type: expected integer, got string
+done
+--- no_error_log
+[error]
+
+
+
+=== TEST 4: add plugin
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" :
+ {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1"
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]],
+ [[{
+ "node": {
+ "value": {
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" :
+ {
+ "127.0.0.1":9092
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1"
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ },
+ "key": "/apisix/routes/1"
+ },
+ "action": "set"
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 5: access
+--- request
+GET /hello
+--- response_body
+hello world
+--- no_error_log
+[error]
+--- wait: 0.2
+
+
+
+=== TEST 6: error log
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" :
+ {
+ "127.0.0.1":9092,
+ "127.0.0.1":9093
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1"
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]],
+ [[{
+ "node": {
+ "value": {
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" :
+ {
+ "127.0.0.1":9092,
+ "127.0.0.1":9093
+ },
+ "kafka_topic" : "test2",
+ "key" : "key1"
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ },
+ "key": "/apisix/routes/1"
+ },
+ "action": "set"
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ local http = require "resty.http"
+ local httpc = http.new()
+ local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
+ local res, err = httpc:request_uri(uri, {method = "GET"})
+ }
+ }
+--- request
+GET /t
+--- error_log
+failed to send data to Kafka topic
+[error]
+--- wait: 0.2