This is an automated email from the ASF dual-hosted git repository.

spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new d081620  feat: kafka-logger implemented reuse kafka producer (#3429)
d081620 is described below

commit d0816200e6313dacaa42d4ac7515228fe27b8bd6
Author: tzssangglass <[email protected]>
AuthorDate: Sun Jan 31 14:52:13 2021 +0800

    feat: kafka-logger implemented reuse kafka producer (#3429)
---
 .github/workflows/centos7-ci.yml         |  1 +
 .travis/linux_openresty_common_runner.sh |  1 +
 .travis/linux_tengine_runner.sh          |  1 +
 apisix/plugins/kafka-logger.lua          | 91 +++++++++++++++++++++-----------
 t/plugin/kafka-logger.t                  | 69 ++++++++++++++++++++++++
 5 files changed, 132 insertions(+), 31 deletions(-)

diff --git a/.github/workflows/centos7-ci.yml b/.github/workflows/centos7-ci.yml
index dc8e0da..e2fcd6b 100644
--- a/.github/workflows/centos7-ci.yml
+++ b/.github/workflows/centos7-ci.yml
@@ -75,6 +75,7 @@ jobs:
         docker run --name eureka -d -p 8761:8761 --env ENVIRONMENT=apisix 
--env spring.application.name=apisix-eureka --env server.port=8761 --env 
eureka.instance.ip-address=127.0.0.1 --env 
eureka.client.registerWithEureka=true --env eureka.client.fetchRegistry=false 
--env eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/ 
bitinit/eureka
         sleep 5
         docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh 
--create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 
1 --topic test2
+        docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh 
--create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 
3 --topic test3
         docker run --rm --name skywalking -d -p 1234:1234 -p 11800:11800 -p 
12800:12800 apache/skywalking-oap-server
 
     - name: install dependencies
diff --git a/.travis/linux_openresty_common_runner.sh 
b/.travis/linux_openresty_common_runner.sh
index e2a95c5..a67c1a9 100755
--- a/.travis/linux_openresty_common_runner.sh
+++ b/.travis/linux_openresty_common_runner.sh
@@ -35,6 +35,7 @@ before_install() {
     docker run --name eureka -d -p 8761:8761 --env ENVIRONMENT=apisix --env 
spring.application.name=apisix-eureka --env server.port=8761 --env 
eureka.instance.ip-address=127.0.0.1 --env 
eureka.client.registerWithEureka=true --env eureka.client.fetchRegistry=false 
--env eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/ 
bitinit/eureka
     sleep 5
     docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh 
--create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 
1 --topic test2
+    docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh 
--create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 
3 --topic test3
 
     # start skywalking
     docker run --rm --name skywalking -d -p 1234:1234 -p 11800:11800 -p 
12800:12800 apache/skywalking-oap-server
diff --git a/.travis/linux_tengine_runner.sh b/.travis/linux_tengine_runner.sh
index ad8a5e3..fb0004a 100755
--- a/.travis/linux_tengine_runner.sh
+++ b/.travis/linux_tengine_runner.sh
@@ -35,6 +35,7 @@ before_install() {
     docker run --name eureka -d -p 8761:8761 --env ENVIRONMENT=apisix --env 
spring.application.name=apisix-eureka --env server.port=8761 --env 
eureka.instance.ip-address=127.0.0.1 --env 
eureka.client.registerWithEureka=true --env eureka.client.fetchRegistry=false 
--env eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/ 
bitinit/eureka
     sleep 5
     docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh 
--create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 
1 --topic test2
+    docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh 
--create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 
3 --topic test3
 }
 
 tengine_install() {
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index 766965b..a545b3b 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -20,7 +20,6 @@ local producer = require ("resty.kafka.producer")
 local batch_processor = require("apisix.utils.batch-processor")
 local pairs    = pairs
 local type     = type
-local table    = table
 local ipairs   = ipairs
 local plugin_name = "kafka-logger"
 local stale_timer_running = false
@@ -28,6 +27,11 @@ local timer_at = ngx.timer.at
 local ngx = ngx
 local buffers = {}
 
+
+local lrucache = core.lrucache.new({
+    type = "plugin",
+})
+
 local schema = {
     type = "object",
     properties = {
@@ -66,40 +70,19 @@ function _M.check_schema(conf)
 end
 
 
-local function send_kafka_data(conf, log_message)
-    if core.table.nkeys(conf.broker_list) == 0 then
-        core.log.error("failed to identify the broker specified")
+local function partition_id(sendbuffer, topic, log_message)
+    if not sendbuffer.topics[topic] then
+        core.log.info("current topic in sendbuffer has no message")
+        return nil
     end
-
-    local broker_list = {}
-    local broker_config = {}
-
-    for host, port  in pairs(conf.broker_list) do
-        if type(host) == 'string'
-            and type(port) == 'number' then
-
-            local broker = {
-                host = host, port = port
-            }
-            table.insert(broker_list,broker)
+    for i, message in pairs(sendbuffer.topics[topic]) do
+        if log_message == message.queue[2] then
+            return i
         end
     end
-
-    broker_config["request_timeout"] = conf.timeout * 1000
-
-    local prod, err = producer:new(broker_list,broker_config)
-    if err then
-        return nil, "failed to identify the broker specified: " .. err
-    end
-
-    local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
-    if not ok then
-        return nil, "failed to send data to Kafka topic: " .. err
-    end
-
-    return true
 end
 
+
 -- remove stale objects from the memory after timer expires
 local function remove_stale_objects(premature)
     if premature then
@@ -118,6 +101,29 @@ local function remove_stale_objects(premature)
 end
 
 
+local function create_producer(broker_list, broker_config)
+    core.log.info("create new kafka producer instance")
+    return producer:new(broker_list, broker_config)
+end
+
+
+local function send_kafka_data(conf, log_message, prod)
+    if core.table.nkeys(conf.broker_list) == 0 then
+        core.log.error("failed to identify the broker specified")
+    end
+
+    local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
+    core.log.info("partition_id: ", partition_id(prod.sendbuffer,
+            conf.kafka_topic, log_message))
+
+    if not ok then
+        return nil, "failed to send data to Kafka topic: " .. err
+    end
+
+    return true
+end
+
+
 function _M.log(conf, ctx)
     local entry
     if conf.meta_format == "origin" then
@@ -141,6 +147,29 @@ function _M.log(conf, ctx)
         return
     end
 
+    -- reuse producer via lrucache to avoid unbalanced partitions of messages 
in kafka
+    local broker_list = core.table.new(core.table.nkeys(conf.broker_list), 0)
+    local broker_config = {}
+
+    for host, port in pairs(conf.broker_list) do
+        if type(host) == 'string'
+                and type(port) == 'number' then
+            local broker = {
+                host = host,
+                port = port
+            }
+            core.table.insert(broker_list, broker)
+        end
+    end
+
+    broker_config["request_timeout"] = conf.timeout * 1000
+
+    local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, 
create_producer,
+                                               broker_list, broker_config)
+    if err then
+        return nil, "failed to identify the broker specified: " .. err
+    end
+
     -- Generate a function to be executed by the batch processor
     local func = function(entries, batch_max_size)
         local data, err
@@ -158,7 +187,7 @@ function _M.log(conf, ctx)
         end
 
         core.log.info("send data to kafka: ", data)
-        return send_kafka_data(conf, data)
+        return send_kafka_data(conf, data, prod)
     end
 
     local config = {
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index 67e5cb8..7aa2e3b 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -576,3 +576,72 @@ hello world
 --- error_log_like eval
 qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/
 --- wait: 2
+
+
+
+=== TEST 17: use the topic with 3 partitions
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                        "plugins": {
+                            "kafka-logger": {
+                                "broker_list" : {
+                                    "127.0.0.1": 9092
+                                },
+                                "kafka_topic" : "test3",
+                                "timeout" : 1,
+                                "batch_max_size": 1,
+                                "include_req_body": false
+                            }
+                        },
+                        "upstream": {
+                            "nodes": {
+                                "127.0.0.1:1980": 1
+                            },
+                            "type": "roundrobin"
+                        },
+                        "uri": "/hello"
+                }]]
+                )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 18: report log to kafka by different partitions
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+            t('/hello',ngx.HTTP_GET)
+            ngx.sleep(0.5)
+        }
+    }
+--- request
+GET /t
+--- timeout: 5s
+--- ignore_response
+--- no_error_log
+[error]
+--- error_log eval
+[qr/partition_id: 1/,
+qr/partition_id: 0/,
+qr/partition_id: 2/]

Reply via email to