spacewander commented on code in PR #8693:
URL: https://github.com/apache/apisix/pull/8693#discussion_r1089666161
##########
apisix/plugins/error-log-logger.lua:
##########
@@ -272,6 +333,52 @@ local function send_to_clickhouse(log_message)
end
+local function create_producer(broker_list, broker_config, cluster_name)
+ core.log.info("create new kafka producer instance")
+ return producer:new(broker_list, broker_config, cluster_name)
+end
+
+
+local function send_to_kafka(log_message)
+ core.log.info("sending a batch logs to kafka brokers: ",
+ core.json.delay_encode(config.kafka.brokers))
+
+ local broker_config = {}
+ broker_config["request_timeout"] = config.timeout * 1000
+ broker_config["producer_type"] = config.kafka.producer_type
+ broker_config["required_acks"] = config.kafka.required_acks
+
+ local metadata = plugin.plugin_metadata(plugin_name)
+ if not (metadata and metadata.value and metadata.modifiedIndex) then
+ core.log.info("please set the correct plugin_metadata for ",
plugin_name)
+ return
Review Comment:
Should return a boolean here?
##########
apisix/plugins/error-log-logger.lua:
##########
@@ -272,6 +333,52 @@ local function send_to_clickhouse(log_message)
end
+local function create_producer(broker_list, broker_config, cluster_name)
+ core.log.info("create new kafka producer instance")
+ return producer:new(broker_list, broker_config, cluster_name)
+end
+
+
+local function send_to_kafka(log_message)
+ core.log.info("sending a batch logs to kafka brokers: ",
+ core.json.delay_encode(config.kafka.brokers))
+
+ local broker_config = {}
+ broker_config["request_timeout"] = config.timeout * 1000
+ broker_config["producer_type"] = config.kafka.producer_type
+ broker_config["required_acks"] = config.kafka.required_acks
+
+ local metadata = plugin.plugin_metadata(plugin_name)
+ if not (metadata and metadata.value and metadata.modifiedIndex) then
+ core.log.info("please set the correct plugin_metadata for ",
plugin_name)
+ return
+ end
+
+ -- reuse producer via kafka_prod_lrucache to avoid unbalanced partitions
of messages in kafka
+ local prod, err = kafka_prod_lrucache(plugin_name .. "#kafka",
metadata.modifiedIndex,
Review Comment:
We don't need to add "#kafka" suffix as this cache is individual
##########
apisix/plugins/error-log-logger.lua:
##########
@@ -272,6 +333,52 @@ local function send_to_clickhouse(log_message)
end
+local function create_producer(broker_list, broker_config, cluster_name)
+ core.log.info("create new kafka producer instance")
+ return producer:new(broker_list, broker_config, cluster_name)
+end
+
+
+local function send_to_kafka(log_message)
+ core.log.info("sending a batch logs to kafka brokers: ",
+ core.json.delay_encode(config.kafka.brokers))
+
+ local broker_config = {}
+ broker_config["request_timeout"] = config.timeout * 1000
+ broker_config["producer_type"] = config.kafka.producer_type
+ broker_config["required_acks"] = config.kafka.required_acks
+
+ local metadata = plugin.plugin_metadata(plugin_name)
+ if not (metadata and metadata.value and metadata.modifiedIndex) then
+ core.log.info("please set the correct plugin_metadata for ",
plugin_name)
+ return
+ end
+
+ -- reuse producer via kafka_prod_lrucache to avoid unbalanced partitions
of messages in kafka
+ local prod, err = kafka_prod_lrucache(plugin_name .. "#kafka",
metadata.modifiedIndex,
+ create_producer,
config.kafka.brokers, broker_config,
+ config.kafka.cluster_name)
+ if not prod then
+ return false, "get kafka producer failed: " .. err
+ end
+ core.log.info("kafka cluster name ", config.kafka.cluster_name, ",
broker_list[1] port ",
+ prod.client.broker_list[1].port)
Review Comment:
```suggestion
prod.client.broker_list[1].port)
```
##########
apisix/plugins/error-log-logger.lua:
##########
@@ -272,6 +333,52 @@ local function send_to_clickhouse(log_message)
end
+local function create_producer(broker_list, broker_config, cluster_name)
+ core.log.info("create new kafka producer instance")
+ return producer:new(broker_list, broker_config, cluster_name)
+end
+
+
+local function send_to_kafka(log_message)
+ core.log.info("sending a batch logs to kafka brokers: ",
+ core.json.delay_encode(config.kafka.brokers))
+
+ local broker_config = {}
+ broker_config["request_timeout"] = config.timeout * 1000
+ broker_config["producer_type"] = config.kafka.producer_type
+ broker_config["required_acks"] = config.kafka.required_acks
+
+ local metadata = plugin.plugin_metadata(plugin_name)
+ if not (metadata and metadata.value and metadata.modifiedIndex) then
+ core.log.info("please set the correct plugin_metadata for ",
plugin_name)
+ return
+ end
+
+ -- reuse producer via kafka_prod_lrucache to avoid unbalanced partitions
of messages in kafka
+ local prod, err = kafka_prod_lrucache(plugin_name .. "#kafka",
metadata.modifiedIndex,
+ create_producer,
config.kafka.brokers, broker_config,
+ config.kafka.cluster_name)
+ if not prod then
+ return false, "get kafka producer failed: " .. err
+ end
+ core.log.info("kafka cluster name ", config.kafka.cluster_name, ",
broker_list[1] port ",
+ prod.client.broker_list[1].port)
+
+ local ok
+ for i = 1, #log_message, 2 do
+ ok, err = prod:send(config.kafka.kafka_topic,
+ config.kafka.key, core.json.encode(log_message[i]))
+ if not ok then
+ return false, "failed to send data to Kafka topic: " .. err ..
+ ", brokers: " ..
core.json.delay_encode(config.kafka.brokers)
Review Comment:
delay_encode is only for log
##########
apisix/plugins/error-log-logger.lua:
##########
@@ -272,6 +333,52 @@ local function send_to_clickhouse(log_message)
end
+local function create_producer(broker_list, broker_config, cluster_name)
+ core.log.info("create new kafka producer instance")
+ return producer:new(broker_list, broker_config, cluster_name)
+end
+
+
+local function send_to_kafka(log_message)
+ core.log.info("sending a batch logs to kafka brokers: ",
+ core.json.delay_encode(config.kafka.brokers))
Review Comment:
```suggestion
core.json.delay_encode(config.kafka.brokers))
```
##########
apisix/plugins/error-log-logger.lua:
##########
@@ -272,6 +333,52 @@ local function send_to_clickhouse(log_message)
end
+local function create_producer(broker_list, broker_config, cluster_name)
+ core.log.info("create new kafka producer instance")
+ return producer:new(broker_list, broker_config, cluster_name)
+end
+
+
+local function send_to_kafka(log_message)
+ core.log.info("sending a batch logs to kafka brokers: ",
+ core.json.delay_encode(config.kafka.brokers))
+
+ local broker_config = {}
+ broker_config["request_timeout"] = config.timeout * 1000
+ broker_config["producer_type"] = config.kafka.producer_type
+ broker_config["required_acks"] = config.kafka.required_acks
+
+ local metadata = plugin.plugin_metadata(plugin_name)
Review Comment:
It seems using the config passed from `process` here might create a race:
Consider we have c1(config) and m1 (modifiedIndex) in `process`, and c2/m2
in `send`. It looks like we might use m2 as key and c1 as value in the cache
below.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]