This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 12596ba feat(kafka-logger): add cluster name support (#4876)
12596ba is described below
commit 12596ba6c070fb7ff76cdd5b8722c904218cedae
Author: tzssangglass <[email protected]>
AuthorDate: Thu Aug 26 09:58:12 2021 +0800
feat(kafka-logger): add cluster name support (#4876)
---
apisix/plugins/kafka-logger.lua | 14 ++--
ci/install-ext-services-via-docker.sh | 13 +--
docs/en/latest/plugins/kafka-logger.md | 1 +
docs/zh/latest/plugins/kafka-logger.md | 1 +
t/plugin/kafka-logger.t | 142 +++++++++++++++++++++++++++++++++
5 files changed, 161 insertions(+), 10 deletions(-)
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index f03c55b..763e66c 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -30,7 +30,6 @@ local timer_at = ngx.timer.at
local ngx = ngx
local buffers = {}
-
local lrucache = core.lrucache.new({
type = "plugin",
})
@@ -65,7 +64,10 @@ local schema = {
buffer_duration = {type = "integer", minimum = 1, default = 60},
inactive_timeout = {type = "integer", minimum = 1, default = 5},
batch_max_size = {type = "integer", minimum = 1, default = 1000},
- include_req_body = {type = "boolean", default = false}
+ include_req_body = {type = "boolean", default = false},
+ -- in lua-resty-kafka, cluster_name is defined as number
+ -- see https://github.com/doujiang24/lua-resty-kafka#new-1
+ cluster_name = {type = "integer", minimum = 1, default = 1},
},
required = {"broker_list", "kafka_topic"}
}
@@ -139,9 +141,9 @@ local function remove_stale_objects(premature)
end
-local function create_producer(broker_list, broker_config)
+local function create_producer(broker_list, broker_config, cluster_name)
core.log.info("create new kafka producer instance")
- return producer:new(broker_list, broker_config)
+ return producer:new(broker_list, broker_config, cluster_name)
end
@@ -215,7 +217,9 @@ function _M.log(conf, ctx)
broker_config["required_acks"] = conf.required_acks
local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil,
create_producer,
- broker_list, broker_config)
+ broker_list, broker_config,
conf.cluster_name)
+ core.log.info("kafka cluster name ", conf.cluster_name, ", broker_list[1]
port ",
+ prod.client.broker_list[1].port)
if err then
return nil, "failed to identify the broker specified: " .. err
end
diff --git a/ci/install-ext-services-via-docker.sh
b/ci/install-ext-services-via-docker.sh
index 8f2ff28..2793de4 100755
--- a/ci/install-ext-services-via-docker.sh
+++ b/ci/install-ext-services-via-docker.sh
@@ -20,14 +20,17 @@ docker run --rm -itd -p 6379:6379 --name apisix_redis
redis:3.0-alpine
docker run --rm -itd -e HTTP_PORT=8888 -e HTTPS_PORT=9999 -p 8888:8888 -p
9999:9999 mendhak/http-https-echo
# Runs Keycloak version 10.0.2 with inbuilt policies for unit tests
docker run --rm -itd -e KEYCLOAK_USER=admin -e KEYCLOAK_PASSWORD=123456 -p
8090:8080 -p 8443:8443 sshniro/keycloak-apisix:1.0.0
-# spin up kafka cluster for tests (1 zookeper and 1 kafka instance)
+# spin up kafka cluster for tests (2 zookeper and 2 kafka instance)
docker network create kafka-net --driver bridge
-docker run --name zookeeper-server -d -p 2181:2181 --network kafka-net -e
ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
-docker run --name kafka-server1 -d --network kafka-net -e
ALLOW_PLAINTEXT_LISTENER=yes -e
KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -p 9092:9092 -e
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
+docker run --name zookeeper-server1 -d -p 2181:2181 --network kafka-net -e
ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
+docker run --name zookeeper-server2 -d -p 12181:2181 --network kafka-net -e
ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
+docker run --name kafka-server1 -d --network kafka-net -e
ALLOW_PLAINTEXT_LISTENER=yes -e
KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server1:2181 -e
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -p 9092:9092 -e
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
+docker run --name kafka-server2 -d --network kafka-net -e
ALLOW_PLAINTEXT_LISTENER=yes -e
KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server2:2181 -e
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -p 19092:9092 -e
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
docker run --name eureka -d -p 8761:8761 --env ENVIRONMENT=apisix --env
spring.application.name=apisix-eureka --env server.port=8761 --env
eureka.instance.ip-address=127.0.0.1 --env
eureka.client.registerWithEureka=true --env eureka.client.fetchRegistry=false
--env eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/
bitinit/eureka
sleep 5
-docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create
--zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic
test2
-docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create
--zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 3 --topic
test3
+docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create
--zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 1
--topic test2
+docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create
--zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 3
--topic test3
+docker exec -i kafka-server2 /opt/bitnami/kafka/bin/kafka-topics.sh --create
--zookeeper zookeeper-server2:2181 --replication-factor 1 --partitions 1
--topic test4
# start skywalking
docker run --rm --name skywalking -d -p 1234:1234 -p 11800:11800 -p
12800:12800 apache/skywalking-oap-server:8.3.0-es6
diff --git a/docs/en/latest/plugins/kafka-logger.md
b/docs/en/latest/plugins/kafka-logger.md
index 4da9a23..9f54c5d 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -59,6 +59,7 @@ For more info on Batch-Processor in Apache APISIX please
refer.
| max_retry_count | integer | optional | 0 | [0,...] |
Maximum number of retries before removing from the processing pipe line.
|
| retry_delay | integer | optional | 1 | [0,...] | Number
of seconds the process execution should be delayed if the execution fails.
|
| include_req_body | boolean | optional | false | [false, true] |
Whether to include the request body. false: indicates that the requested body
is not included; true: indicates that the requested body is included. |
+| cluster_name | integer | optional | 1 | [0,...] | the
name of the cluster. When there are two or more kafka clusters, you can specify
different names. And this only works with async producer_type.|
### examples of meta_format
diff --git a/docs/zh/latest/plugins/kafka-logger.md
b/docs/zh/latest/plugins/kafka-logger.md
index 05d6f46..75cadf1 100644
--- a/docs/zh/latest/plugins/kafka-logger.md
+++ b/docs/zh/latest/plugins/kafka-logger.md
@@ -57,6 +57,7 @@ title: kafka-logger
| max_retry_count | integer | 可选 | 0 | [0,...] |
从处理管道中移除之前的最大重试次数。 |
| retry_delay | integer | 可选 | 1 | [0,...] |
如果执行失败,则应延迟执行流程的秒数。 |
| include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求
body。false: 表示不包含请求的 body ; true: 表示包含请求的 body 。|
+| cluster_name | integer | 可选 | 1 | [0,...] | kafka
集群的名称。当有两个或多个 kafka 集群时,可以指定不同的名称。只适用于 producer_type 是 async 模式。|
### meta_format 参考示例
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index bf78ef2..d903615 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -857,3 +857,145 @@ GET /t
send data to kafka: GET /hello
send data to kafka: GET /hello
send data to kafka: GET /hello
+
+
+
+=== TEST 22: update the broker_list and cluster_name, generate different kafka
producers
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]]
+ )
+ ngx.sleep(0.5)
+
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ code, body = t('/apisix/admin/global_rules/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" : {
+ "127.0.0.1": 9092
+ },
+ "kafka_topic" : "test2",
+ "timeout" : 1,
+ "batch_max_size": 1,
+ "include_req_body": false,
+ "cluster_name": 1
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ t('/hello',ngx.HTTP_GET)
+ ngx.sleep(0.5)
+
+ code, body = t('/apisix/admin/global_rules/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" : {
+ "127.0.0.1": 19092
+ },
+ "kafka_topic" : "test4",
+ "timeout" : 1,
+ "batch_max_size": 1,
+ "include_req_body": false,
+ "cluster_name": 2
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ t('/hello',ngx.HTTP_GET)
+ ngx.sleep(0.5)
+
+ ngx.sleep(2)
+ ngx.say("passed")
+ }
+ }
+--- request
+GET /t
+--- timeout: 10
+--- response
+passed
+--- wait: 5
+--- error_log
+phase_func(): kafka cluster name 1, broker_list[1] port 9092
+phase_func(): kafka cluster name 2, broker_list[1] port 19092
+--- no_error_log eval
+qr/not found topic/
+
+
+
+=== TEST 23: use the topic that does not exist on kafka(even if kafka allows
auto create topics, first time push messages to kafka would got this error)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/global_rules/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" : {
+ "127.0.0.1": 9092
+ },
+ "kafka_topic" : "undefined_topic",
+ "timeout" : 1,
+ "batch_max_size": 1,
+ "include_req_body": false
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ ngx.say("fail")
+ return
+ end
+
+ t('/hello',ngx.HTTP_GET)
+ ngx.sleep(0.5)
+
+ ngx.sleep(2)
+ ngx.say("passed")
+ }
+ }
+--- request
+GET /t
+--- timeout: 5
+--- response
+passed
+--- error_log eval
+qr/not found topic, retryable: true, topic: undefined_topic, partition_id: -1/