spacewander commented on code in PR #8693:
URL: https://github.com/apache/apisix/pull/8693#discussion_r1071765774


##########
apisix/plugins/error-log-logger.lua:
##########
@@ -272,6 +330,55 @@ local function send_to_clickhouse(log_message)
 end
 
 
+local function create_producer(broker_list, broker_config, cluster_name)
+    core.log.info("create new kafka producer instance")
+    return producer:new(broker_list, broker_config, cluster_name)
+end
+
+
+local function send_to_kafka(log_message)
+    core.log.info("sending a batch logs to kafka brokers: ", 
core.json.encode(config.kafka.brokers))
+
+    local broker_config = {}
+    broker_config["request_timeout"] = config.timeout * 1000
+    broker_config["producer_type"] = config.kafka.producer_type
+    broker_config["required_acks"] = config.kafka.required_acks
+
+    local prod
+    local err
+
+    local metadata = plugin.plugin_metadata(plugin_name)
+    if not (metadata and metadata.value and metadata.modifiedIndex) then
+        core.log.info("please set the correct plugin_metadata for ", 
plugin_name)
+        return
+    else
+        -- reuse producer via lrucache to avoid unbalanced partitions of 
messages in kafka
+        prod, err = lrucache(plugin_name .. "#kafka", metadata.modifiedIndex,
+                             create_producer, config.kafka.brokers, 
broker_config,
+                             config.kafka.cluster_name)
+        if not prod then
+            return false, "get kafka producer failed " .. err

Review Comment:
   ```suggestion
               return false, "get kafka producer failed: " .. err
   ```



##########
apisix/plugins/error-log-logger.lua:
##########
@@ -19,6 +19,7 @@ local core = require("apisix.core")
 local errlog = require("ngx.errlog")
 local batch_processor = require("apisix.utils.batch-processor")
 local plugin = require("apisix.plugin")
+local producer = require ("resty.kafka.producer")
 local timers = require("apisix.timers")
 local http = require("resty.http")

Review Comment:
   Let's group the resty module together.



##########
apisix/plugins/error-log-logger.lua:
##########
@@ -272,6 +330,55 @@ local function send_to_clickhouse(log_message)
 end
 
 
+local function create_producer(broker_list, broker_config, cluster_name)
+    core.log.info("create new kafka producer instance")
+    return producer:new(broker_list, broker_config, cluster_name)
+end
+
+
+local function send_to_kafka(log_message)
+    core.log.info("sending a batch logs to kafka brokers: ", 
core.json.encode(config.kafka.brokers))
+
+    local broker_config = {}
+    broker_config["request_timeout"] = config.timeout * 1000
+    broker_config["producer_type"] = config.kafka.producer_type
+    broker_config["required_acks"] = config.kafka.required_acks
+
+    local prod
+    local err
+
+    local metadata = plugin.plugin_metadata(plugin_name)
+    if not (metadata and metadata.value and metadata.modifiedIndex) then
+        core.log.info("please set the correct plugin_metadata for ", 
plugin_name)
+        return
+    else

Review Comment:
   We can use `end` here, no need to nest the code



##########
apisix/plugins/error-log-logger.lua:
##########
@@ -272,6 +330,55 @@ local function send_to_clickhouse(log_message)
 end
 
 
+local function create_producer(broker_list, broker_config, cluster_name)
+    core.log.info("create new kafka producer instance")
+    return producer:new(broker_list, broker_config, cluster_name)
+end
+
+
+local function send_to_kafka(log_message)
+    core.log.info("sending a batch logs to kafka brokers: ", 
core.json.encode(config.kafka.brokers))
+
+    local broker_config = {}
+    broker_config["request_timeout"] = config.timeout * 1000
+    broker_config["producer_type"] = config.kafka.producer_type
+    broker_config["required_acks"] = config.kafka.required_acks
+
+    local prod
+    local err
+
+    local metadata = plugin.plugin_metadata(plugin_name)
+    if not (metadata and metadata.value and metadata.modifiedIndex) then
+        core.log.info("please set the correct plugin_metadata for ", 
plugin_name)
+        return
+    else
+        -- reuse producer via lrucache to avoid unbalanced partitions of 
messages in kafka
+        prod, err = lrucache(plugin_name .. "#kafka", metadata.modifiedIndex,
+                             create_producer, config.kafka.brokers, 
broker_config,
+                             config.kafka.cluster_name)
+        if not prod then
+            return false, "get kafka producer failed " .. err
+        end
+        core.log.info("kafka cluster name ", config.kafka.cluster_name, ", 
broker_list[1] port ",
+                      prod.client.broker_list[1].port)
+    end
+
+
+    local ok
+    for i = 1, #log_message, 2 do
+        ok, err = prod:send(config.kafka.kafka_topic,
+                            config.kafka.key, core.json.encode(log_message[i]))
+        if not ok then
+            return false, "failed to send data to Kafka topic: " .. err ..
+                    ", brokers: " .. core.json.encode(config.kafka.brokers)
+        end
+        core.log.info("send data to kafka: ", core.json.encode(log_message[i]))

Review Comment:
   Can we use delay_encode?



##########
apisix/plugins/error-log-logger.lua:
##########
@@ -272,6 +330,55 @@ local function send_to_clickhouse(log_message)
 end
 
 
+local function create_producer(broker_list, broker_config, cluster_name)
+    core.log.info("create new kafka producer instance")
+    return producer:new(broker_list, broker_config, cluster_name)
+end
+
+
+local function send_to_kafka(log_message)
+    core.log.info("sending a batch logs to kafka brokers: ", 
core.json.encode(config.kafka.brokers))
+
+    local broker_config = {}
+    broker_config["request_timeout"] = config.timeout * 1000

Review Comment:
   Where is the config from? This function doesn't have an argument called 
config.



##########
t/plugin/error-log-logger-kafka.t:
##########
@@ -0,0 +1,195 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level("info");
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    if (!defined $block->extra_yaml_config) {
+        my $extra_yaml_config = <<_EOC_;
+plugins:
+    - error-log-logger
+_EOC_
+        $block->set_value("extra_yaml_config", $extra_yaml_config);
+    }
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: test schema checker
+--- config
+    location /t {
+        content_by_lua_block {
+        local core = require("apisix.core")
+            local plugin = require("apisix.plugins.error-log-logger")
+            local ok, err = plugin.check_schema(
+                {
+                    kafka = {
+                        brokers = {
+                            {
+                                host = "127.0.0.1",
+                                port = 9092
+                            }
+                        },
+                        kafka_topic = "test2"
+                    }
+                },
+                core.schema.TYPE_METADATA
+            )
+            if not ok then
+                ngx.say(err)
+            end
+
+            ngx.say("done")
+        }
+    }
+--- response_body
+done
+
+
+
+=== TEST 2: put plugin metadata and log an error level message - no auth kafka
+--- config
+    location /t {
+        content_by_lua_block {
+            local core = require("apisix.core")
+            local t = require("lib.test_admin").test
+            local code, body = 
t('/apisix/admin/plugin_metadata/error-log-logger',
+                ngx.HTTP_PUT,
+                [[{
+                    "kafka": {
+                        "brokers": [{
+                            "host": "127.0.0.1",
+                            "port": 9092
+                        }],
+                        "kafka_topic": "test2"
+                    },
+                    "level": "ERROR",
+                    "inactive_timeout": 1
+                }]]
+                )
+            ngx.sleep(2)
+            core.log.error("this is a error message for test2.")
+        }
+    }
+--- response_body
+--- error_log eval
+[qr/this is a error message for test2/,
+qr/send data to kafka: .*test2/]
+--- wait: 3
+
+
+
+=== TEST 3: log a error level message
+--- config
+    location /t {
+        content_by_lua_block {
+            local core = require("apisix.core")
+            core.log.error("this is a error message for test3.")
+        }
+    }
+--- response_body

Review Comment:
   Ditto



##########
t/plugin/error-log-logger-kafka.t:
##########
@@ -0,0 +1,195 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+log_level("info");
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    if (!defined $block->extra_yaml_config) {
+        my $extra_yaml_config = <<_EOC_;
+plugins:
+    - error-log-logger
+_EOC_
+        $block->set_value("extra_yaml_config", $extra_yaml_config);
+    }
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: test schema checker
+--- config
+    location /t {
+        content_by_lua_block {
+        local core = require("apisix.core")
+            local plugin = require("apisix.plugins.error-log-logger")
+            local ok, err = plugin.check_schema(
+                {
+                    kafka = {
+                        brokers = {
+                            {
+                                host = "127.0.0.1",
+                                port = 9092
+                            }
+                        },
+                        kafka_topic = "test2"
+                    }
+                },
+                core.schema.TYPE_METADATA
+            )
+            if not ok then
+                ngx.say(err)
+            end
+
+            ngx.say("done")
+        }
+    }
+--- response_body
+done
+
+
+
+=== TEST 2: put plugin metadata and log an error level message - no auth kafka
+--- config
+    location /t {
+        content_by_lua_block {
+            local core = require("apisix.core")
+            local t = require("lib.test_admin").test
+            local code, body = 
t('/apisix/admin/plugin_metadata/error-log-logger',
+                ngx.HTTP_PUT,
+                [[{
+                    "kafka": {
+                        "brokers": [{
+                            "host": "127.0.0.1",
+                            "port": 9092
+                        }],
+                        "kafka_topic": "test2"
+                    },
+                    "level": "ERROR",
+                    "inactive_timeout": 1
+                }]]
+                )
+            ngx.sleep(2)
+            core.log.error("this is a error message for test2.")
+        }
+    }
+--- response_body

Review Comment:
   The response_body in these tests is unnecessary as we don't provide 
responses at all.



##########
apisix/plugins/error-log-logger.lua:
##########
@@ -272,6 +330,55 @@ local function send_to_clickhouse(log_message)
 end
 
 
+local function create_producer(broker_list, broker_config, cluster_name)
+    core.log.info("create new kafka producer instance")
+    return producer:new(broker_list, broker_config, cluster_name)
+end
+
+
+local function send_to_kafka(log_message)
+    core.log.info("sending a batch logs to kafka brokers: ", 
core.json.encode(config.kafka.brokers))

Review Comment:
   Can we use delay_encode?



##########
apisix/plugins/error-log-logger.lua:
##########
@@ -272,6 +330,55 @@ local function send_to_clickhouse(log_message)
 end
 
 
+local function create_producer(broker_list, broker_config, cluster_name)
+    core.log.info("create new kafka producer instance")
+    return producer:new(broker_list, broker_config, cluster_name)
+end
+
+
+local function send_to_kafka(log_message)
+    core.log.info("sending a batch logs to kafka brokers: ", 
core.json.encode(config.kafka.brokers))
+
+    local broker_config = {}
+    broker_config["request_timeout"] = config.timeout * 1000
+    broker_config["producer_type"] = config.kafka.producer_type
+    broker_config["required_acks"] = config.kafka.required_acks
+
+    local prod
+    local err
+
+    local metadata = plugin.plugin_metadata(plugin_name)
+    if not (metadata and metadata.value and metadata.modifiedIndex) then
+        core.log.info("please set the correct plugin_metadata for ", 
plugin_name)
+        return
+    else
+        -- reuse producer via lrucache to avoid unbalanced partitions of 
messages in kafka
+        prod, err = lrucache(plugin_name .. "#kafka", metadata.modifiedIndex,

Review Comment:
   Better to use a separate lrucache to cache different data.



##########
docs/en/latest/plugins/error-log-logger.md:
##########
@@ -48,6 +48,17 @@ It might take some time to receive the log data. It will be 
automatically sent a
 | clickhouse.password              | String  | False    |                      
          |                                                                     
                    | ClickHouse password.                                      
                                                   |
 | clickhouse.database              | String  | False    |                      
          |                                                                     
                    | Name of the database to store the logs.                   
                                                   |
 | clickhouse.logtable              | String  | False    |                      
          |                                                                     
                    | Table name to store the logs.                             
                                                   |
+| kafka.brokers                    | array   | True     |                |     
                  | List of Kafka brokers (nodes).                              
                                                                                
                                                                                
                                                                                
                                     |
+| kafka.brokers.host                     | string  | True     |                
|                       | The host of Kafka broker, e.g, `192.168.1.1`.         
                                                                                
                                                                                
                                                                                
                                                          |
+| kafka.brokers.port                     | integer | True     |                
|   [0, 65535]                  |  The port of Kafka broker                     
                                                                                
                                                                                
                                                                                
                                             |
+| kafka.brokers.sasl_config              | object  | False    |                
|                               |  The sasl config of Kafka broker              
                                                                                
                                                                                
                                                                                
                                                   |
+| kafka.brokers.sasl_config.mechanism    | string  | False    | "PLAIN"        
  | ["PLAIN"]           |     The mechaism of sasl config                       
                                                                                
                                                                                
                                                                                
                                      |
+| kafka.brokers.sasl_config.user         | string  | True     |                
  |                     |  The user of sasl_config. If sasl_config exists, it's 
required.                                                                       
                                                                                
                                                                                
                                                      |
+| kafka.brokers.sasl_config.password     | string  | True     |                
  |                     | The password of sasl_config. If sasl_config exists, 
it's required.                                                                  
                                                                                
                                                                                
                                                               |
+| kafka.kafka_topic                      | string  | True     |                
|                       | Target topic to push the logs for organisation.       
                                                                                
                                                                                
                                                                                
                                           |
+| kafka.producer_type                    | string  | False    | async          
| ["async", "sync"]     | Message sending mode of the producer.                 
                                                                                
                                                                                
                                                                                
                                           |
+| kafka.required_acks                    | integer | False    | 1              
| [0, 1, -1]            | Number of acknowledgements the leader needs to 
receive for the producer to consider the request complete. This controls the 
durability of the sent records. The attribute follows the same configuration as 
the Kafka `acks` attribute. See [Apache Kafka 
documentation](https://kafka.apache.org/documentation/#producerconfigs_acks) 
for more. |
+| kafka.key                              | string  | False    |                
|                       | Key used for allocating partitions for messages.      
                                                                                
                                                                                
                                                                                
                                           |

Review Comment:
   cluster_name is not documented?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to