This is an automated email from the ASF dual-hosted git repository.
spacewander pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new d081620 feat: kafka-logger implemented reuse kafka producer (#3429)
d081620 is described below
commit d0816200e6313dacaa42d4ac7515228fe27b8bd6
Author: tzssangglass <[email protected]>
AuthorDate: Sun Jan 31 14:52:13 2021 +0800
feat: kafka-logger implemented reuse kafka producer (#3429)
---
.github/workflows/centos7-ci.yml | 1 +
.travis/linux_openresty_common_runner.sh | 1 +
.travis/linux_tengine_runner.sh | 1 +
apisix/plugins/kafka-logger.lua | 91 +++++++++++++++++++++-----------
t/plugin/kafka-logger.t | 69 ++++++++++++++++++++++++
5 files changed, 132 insertions(+), 31 deletions(-)
diff --git a/.github/workflows/centos7-ci.yml b/.github/workflows/centos7-ci.yml
index dc8e0da..e2fcd6b 100644
--- a/.github/workflows/centos7-ci.yml
+++ b/.github/workflows/centos7-ci.yml
@@ -75,6 +75,7 @@ jobs:
docker run --name eureka -d -p 8761:8761 --env ENVIRONMENT=apisix
--env spring.application.name=apisix-eureka --env server.port=8761 --env
eureka.instance.ip-address=127.0.0.1 --env
eureka.client.registerWithEureka=true --env eureka.client.fetchRegistry=false
--env eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/
bitinit/eureka
sleep 5
docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh
--create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions
1 --topic test2
+ docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh
--create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions
3 --topic test3
docker run --rm --name skywalking -d -p 1234:1234 -p 11800:11800 -p
12800:12800 apache/skywalking-oap-server
- name: install dependencies
diff --git a/.travis/linux_openresty_common_runner.sh
b/.travis/linux_openresty_common_runner.sh
index e2a95c5..a67c1a9 100755
--- a/.travis/linux_openresty_common_runner.sh
+++ b/.travis/linux_openresty_common_runner.sh
@@ -35,6 +35,7 @@ before_install() {
docker run --name eureka -d -p 8761:8761 --env ENVIRONMENT=apisix --env
spring.application.name=apisix-eureka --env server.port=8761 --env
eureka.instance.ip-address=127.0.0.1 --env
eureka.client.registerWithEureka=true --env eureka.client.fetchRegistry=false
--env eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/
bitinit/eureka
sleep 5
docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh
--create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions
1 --topic test2
+ docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh
--create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions
3 --topic test3
# start skywalking
docker run --rm --name skywalking -d -p 1234:1234 -p 11800:11800 -p
12800:12800 apache/skywalking-oap-server
diff --git a/.travis/linux_tengine_runner.sh b/.travis/linux_tengine_runner.sh
index ad8a5e3..fb0004a 100755
--- a/.travis/linux_tengine_runner.sh
+++ b/.travis/linux_tengine_runner.sh
@@ -35,6 +35,7 @@ before_install() {
docker run --name eureka -d -p 8761:8761 --env ENVIRONMENT=apisix --env
spring.application.name=apisix-eureka --env server.port=8761 --env
eureka.instance.ip-address=127.0.0.1 --env
eureka.client.registerWithEureka=true --env eureka.client.fetchRegistry=false
--env eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/
bitinit/eureka
sleep 5
docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh
--create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions
1 --topic test2
+ docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh
--create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions
3 --topic test3
}
tengine_install() {
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index 766965b..a545b3b 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -20,7 +20,6 @@ local producer = require ("resty.kafka.producer")
local batch_processor = require("apisix.utils.batch-processor")
local pairs = pairs
local type = type
-local table = table
local ipairs = ipairs
local plugin_name = "kafka-logger"
local stale_timer_running = false
@@ -28,6 +27,11 @@ local timer_at = ngx.timer.at
local ngx = ngx
local buffers = {}
+
+local lrucache = core.lrucache.new({
+ type = "plugin",
+})
+
local schema = {
type = "object",
properties = {
@@ -66,40 +70,19 @@ function _M.check_schema(conf)
end
-local function send_kafka_data(conf, log_message)
- if core.table.nkeys(conf.broker_list) == 0 then
- core.log.error("failed to identify the broker specified")
+local function partition_id(sendbuffer, topic, log_message)
+ if not sendbuffer.topics[topic] then
+ core.log.info("current topic in sendbuffer has no message")
+ return nil
end
-
- local broker_list = {}
- local broker_config = {}
-
- for host, port in pairs(conf.broker_list) do
- if type(host) == 'string'
- and type(port) == 'number' then
-
- local broker = {
- host = host, port = port
- }
- table.insert(broker_list,broker)
+ for i, message in pairs(sendbuffer.topics[topic]) do
+ if log_message == message.queue[2] then
+ return i
end
end
-
- broker_config["request_timeout"] = conf.timeout * 1000
-
- local prod, err = producer:new(broker_list,broker_config)
- if err then
- return nil, "failed to identify the broker specified: " .. err
- end
-
- local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
- if not ok then
- return nil, "failed to send data to Kafka topic: " .. err
- end
-
- return true
end
+
-- remove stale objects from the memory after timer expires
local function remove_stale_objects(premature)
if premature then
@@ -118,6 +101,29 @@ local function remove_stale_objects(premature)
end
+local function create_producer(broker_list, broker_config)
+ core.log.info("create new kafka producer instance")
+ return producer:new(broker_list, broker_config)
+end
+
+
+local function send_kafka_data(conf, log_message, prod)
+ if core.table.nkeys(conf.broker_list) == 0 then
+ core.log.error("failed to identify the broker specified")
+ end
+
+ local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
+ core.log.info("partition_id: ", partition_id(prod.sendbuffer,
+ conf.kafka_topic, log_message))
+
+ if not ok then
+ return nil, "failed to send data to Kafka topic: " .. err
+ end
+
+ return true
+end
+
+
function _M.log(conf, ctx)
local entry
if conf.meta_format == "origin" then
@@ -141,6 +147,29 @@ function _M.log(conf, ctx)
return
end
+ -- reuse producer via lrucache to avoid unbalanced partitions of messages
in kafka
+ local broker_list = core.table.new(core.table.nkeys(conf.broker_list), 0)
+ local broker_config = {}
+
+ for host, port in pairs(conf.broker_list) do
+ if type(host) == 'string'
+ and type(port) == 'number' then
+ local broker = {
+ host = host,
+ port = port
+ }
+ core.table.insert(broker_list, broker)
+ end
+ end
+
+ broker_config["request_timeout"] = conf.timeout * 1000
+
+ local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil,
create_producer,
+ broker_list, broker_config)
+ if err then
+ return nil, "failed to identify the broker specified: " .. err
+ end
+
-- Generate a function to be executed by the batch processor
local func = function(entries, batch_max_size)
local data, err
@@ -158,7 +187,7 @@ function _M.log(conf, ctx)
end
core.log.info("send data to kafka: ", data)
- return send_kafka_data(conf, data)
+ return send_kafka_data(conf, data, prod)
end
local config = {
diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t
index 67e5cb8..7aa2e3b 100644
--- a/t/plugin/kafka-logger.t
+++ b/t/plugin/kafka-logger.t
@@ -576,3 +576,72 @@ hello world
--- error_log_like eval
qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/
--- wait: 2
+
+
+
+=== TEST 17: use the topic with 3 partitions
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins": {
+ "kafka-logger": {
+ "broker_list" : {
+ "127.0.0.1": 9092
+ },
+ "kafka_topic" : "test3",
+ "timeout" : 1,
+ "batch_max_size": 1,
+ "include_req_body": false
+ }
+ },
+ "upstream": {
+ "nodes": {
+ "127.0.0.1:1980": 1
+ },
+ "type": "roundrobin"
+ },
+ "uri": "/hello"
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 18: report log to kafka by different partitions
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ t('/hello',ngx.HTTP_GET)
+ ngx.sleep(0.5)
+ t('/hello',ngx.HTTP_GET)
+ ngx.sleep(0.5)
+ t('/hello',ngx.HTTP_GET)
+ ngx.sleep(0.5)
+ }
+ }
+--- request
+GET /t
+--- timeout: 5s
+--- ignore_response
+--- no_error_log
+[error]
+--- error_log eval
+[qr/partition_id: 1/,
+qr/partition_id: 0/,
+qr/partition_id: 2/]