This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 12596ba  feat(kafka-logger): add cluster name support (#4876)
12596ba is described below

commit 12596ba6c070fb7ff76cdd5b8722c904218cedae
Author: tzssangglass <[email protected]>
AuthorDate: Thu Aug 26 09:58:12 2021 +0800

    feat(kafka-logger): add cluster name support (#4876)
---
 apisix/plugins/kafka-logger.lua        |  14 ++--
 ci/install-ext-services-via-docker.sh  |  13 +--
 docs/en/latest/plugins/kafka-logger.md |   1 +
 docs/zh/latest/plugins/kafka-logger.md |   1 +
 t/plugin/kafka-logger.t                | 142 +++++++++++++++++++++++++++++++++
 5 files changed, 161 insertions(+), 10 deletions(-)

diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index f03c55b..763e66c 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -30,7 +30,6 @@ local timer_at = ngx.timer.at
 local ngx = ngx
 local buffers = {}
 
-
 local lrucache = core.lrucache.new({
     type = "plugin",
 })
@@ -65,7 +64,10 @@ local schema = {
         buffer_duration = {type = "integer", minimum = 1, default = 60},
         inactive_timeout = {type = "integer", minimum = 1, default = 5},
         batch_max_size = {type = "integer", minimum = 1, default = 1000},
-        include_req_body = {type = "boolean", default = false}
+        include_req_body = {type = "boolean", default = false},
+        -- in lua-resty-kafka, cluster_name is defined as number
+        -- see https://github.com/doujiang24/lua-resty-kafka#new-1
+        cluster_name = {type = "integer", minimum = 1, default = 1},
     },
     required = {"broker_list", "kafka_topic"}
 }
@@ -139,9 +141,9 @@ local function remove_stale_objects(premature)
 end
 
 
-local function create_producer(broker_list, broker_config)
+local function create_producer(broker_list, broker_config, cluster_name)
     core.log.info("create new kafka producer instance")
-    return producer:new(broker_list, broker_config)
+    return producer:new(broker_list, broker_config, cluster_name)
 end
 
 
@@ -215,7 +217,9 @@ function _M.log(conf, ctx)
     broker_config["required_acks"] = conf.required_acks
 
     local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, 
create_producer,
-                                               broker_list, broker_config)
+                                               broker_list, broker_config, 
conf.cluster_name)
+    core.log.info("kafka cluster name ", conf.cluster_name, ", broker_list[1] 
port ",
+                  prod.client.broker_list[1].port)
     if err then
         return nil, "failed to identify the broker specified: " .. err
     end
diff --git a/ci/install-ext-services-via-docker.sh 
b/ci/install-ext-services-via-docker.sh
index 8f2ff28..2793de4 100755
--- a/ci/install-ext-services-via-docker.sh
+++ b/ci/install-ext-services-via-docker.sh
@@ -20,14 +20,17 @@ docker run --rm -itd -p 6379:6379 --name apisix_redis 
redis:3.0-alpine
 docker run --rm -itd -e HTTP_PORT=8888 -e HTTPS_PORT=9999 -p 8888:8888 -p 
9999:9999 mendhak/http-https-echo
 # Runs Keycloak version 10.0.2 with inbuilt policies for unit tests
 docker run --rm -itd -e KEYCLOAK_USER=admin -e KEYCLOAK_PASSWORD=123456 -p 
8090:8080 -p 8443:8443 sshniro/keycloak-apisix:1.0.0
-# spin up kafka cluster for tests (1 zookeper and 1 kafka instance)
+# spin up kafka cluster for tests (2 zookeper and 2 kafka instance)
 docker network create kafka-net --driver bridge
-docker run --name zookeeper-server -d -p 2181:2181 --network kafka-net -e 
ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
-docker run --name kafka-server1 -d --network kafka-net -e 
ALLOW_PLAINTEXT_LISTENER=yes -e 
KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e 
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -p 9092:9092 -e 
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
+docker run --name zookeeper-server1 -d -p 2181:2181 --network kafka-net -e 
ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
+docker run --name zookeeper-server2 -d -p 12181:2181 --network kafka-net -e 
ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
+docker run --name kafka-server1 -d --network kafka-net -e 
ALLOW_PLAINTEXT_LISTENER=yes -e 
KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server1:2181 -e 
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -p 9092:9092 -e 
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
+docker run --name kafka-server2 -d --network kafka-net -e 
ALLOW_PLAINTEXT_LISTENER=yes -e 
KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server2:2181 -e 
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -p 19092:9092 -e 
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
 docker run --name eureka -d -p 8761:8761 --env ENVIRONMENT=apisix --env 
spring.application.name=apisix-eureka --env server.port=8761 --env 
eureka.instance.ip-address=127.0.0.1 --env 
eureka.client.registerWithEureka=true --env eureka.client.fetchRegistry=false 
--env eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/ 
bitinit/eureka
 sleep 5
-docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create 
--zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic 
test2
-docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create 
--zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 3 --topic 
test3
+docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create 
--zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 1 
--topic test2
+docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create 
--zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 3 
--topic test3
+docker exec -i kafka-server2 /opt/bitnami/kafka/bin/kafka-topics.sh --create 
--zookeeper zookeeper-server2:2181 --replication-factor 1 --partitions 1 
--topic test4
 
 # start skywalking
 docker run --rm --name skywalking -d -p 1234:1234 -p 11800:11800 -p 
12800:12800 apache/skywalking-oap-server:8.3.0-es6
diff --git a/docs/en/latest/plugins/kafka-logger.md 
b/docs/en/latest/plugins/kafka-logger.md
index 4da9a23..9f54c5d 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -59,6 +59,7 @@ For more info on Batch-Processor in Apache APISIX please 
refer.
 | max_retry_count  | integer | optional    | 0              | [0,...] | 
Maximum number of retries before removing from the processing pipe line.        
         |
 | retry_delay      | integer | optional    | 1              | [0,...] | Number 
of seconds the process execution should be delayed if the execution fails.      
  |
 | include_req_body | boolean | optional    | false          | [false, true] | 
Whether to include the request body. false: indicates that the requested body 
is not included; true: indicates that the requested body is included. |
+| cluster_name     | integer | optional    | 1              | [0,...] | the 
name of the cluster. When there are two or more kafka clusters, you can specify 
different names. And this only works with async producer_type.|
 
 ### examples of meta_format
 
diff --git a/docs/zh/latest/plugins/kafka-logger.md 
b/docs/zh/latest/plugins/kafka-logger.md
index 05d6f46..75cadf1 100644
--- a/docs/zh/latest/plugins/kafka-logger.md
+++ b/docs/zh/latest/plugins/kafka-logger.md
@@ -57,6 +57,7 @@ title: kafka-logger
 | max_retry_count  | integer | 可选   | 0              | [0,...] | 
从处理管道中移除之前的最大重试次数。             |
 | retry_delay      | integer | 可选   | 1              | [0,...] | 
如果执行失败,则应延迟执行流程的秒数。           |
 | include_req_body | boolean | 可选   | false          | [false, true] | 是否包括请求 
body。false: 表示不包含请求的 body ; true: 表示包含请求的 body 。|
+| cluster_name     | integer | 可选   | 1              | [0,...] | kafka 
集群的名称。当有两个或多个 kafka 集群时,可以指定不同的名称。只适用于 producer_type 是 async 模式。|
 
 ### meta_format 参考示例
 
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index bf78ef2..d903615 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -857,3 +857,145 @@ GET /t
 send data to kafka: GET /hello
 send data to kafka: GET /hello
 send data to kafka: GET /hello
+
+
+
+=== TEST 22: update the broker_list and cluster_name, generate different kafka 
producers
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "upstream": {
+                        "nodes": {
+                            "127.0.0.1:1980": 1
+                        },
+                        "type": "roundrobin"
+                    },
+                    "uri": "/hello"
+                }]]
+            )
+            ngx.sleep(0.5)
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            code, body = t('/apisix/admin/global_rules/1',
+                ngx.HTTP_PUT,
+                 [[{
+                    "plugins": {
+                        "kafka-logger": {
+                            "broker_list" : {
+                                "127.0.0.1": 9092
+                            },
+                            "kafka_topic" : "test2",
+                            "timeout" : 1,
+                            "batch_max_size": 1,
+                            "include_req_body": false,
+                            "cluster_name": 1
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+
+            code, body = t('/apisix/admin/global_rules/1',
+                ngx.HTTP_PUT,
+                 [[{
+                    "plugins": {
+                        "kafka-logger": {
+                            "broker_list" : {
+                                "127.0.0.1": 19092
+                            },
+                            "kafka_topic" : "test4",
+                            "timeout" : 1,
+                            "batch_max_size": 1,
+                            "include_req_body": false,
+                            "cluster_name": 2
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+
+            ngx.sleep(2)
+            ngx.say("passed")
+        }
+    }
+--- request
+GET /t
+--- timeout: 10
+--- response
+passed
+--- wait: 5
+--- error_log
+phase_func(): kafka cluster name 1, broker_list[1] port 9092
+phase_func(): kafka cluster name 2, broker_list[1] port 19092
+--- no_error_log eval
+qr/not found topic/
+
+
+
+=== TEST 23: use the topic that does not exist on kafka(even if kafka allows 
auto create topics, first time push messages to kafka would got this error)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/global_rules/1',
+                ngx.HTTP_PUT,
+                 [[{
+                    "plugins": {
+                        "kafka-logger": {
+                            "broker_list" : {
+                                "127.0.0.1": 9092
+                            },
+                            "kafka_topic" : "undefined_topic",
+                            "timeout" : 1,
+                            "batch_max_size": 1,
+                            "include_req_body": false
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+                ngx.say("fail")
+                return
+            end
+
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+
+            ngx.sleep(2)
+            ngx.say("passed")
+        }
+    }
+--- request
+GET /t
+--- timeout: 5
+--- response
+passed
+--- error_log eval
+qr/not found topic, retryable: true, topic: undefined_topic, partition_id: -1/

Reply via email to