This is an automated email from the ASF dual-hosted git repository.

ashishtiwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new d58681967 feat(kafka-logger): add support for scram for authentication 
(#12693)
d58681967 is described below

commit d58681967ef5ed44227d7d15e20934cae3e2e08b
Author: Ashish Tiwari <[email protected]>
AuthorDate: Fri Oct 24 14:11:10 2025 +0530

    feat(kafka-logger): add support for scram for authentication (#12693)
---
 apisix/plugins/kafka-logger.lua                 |   2 +-
 ci/init-plugin-test-service.sh                  |  10 +-
 ci/pod/docker-compose.plugin.yml                |  30 +++
 ci/pod/kafka/kafka-server/env/common3-scram.env |  20 ++
 ci/pod/kafka/kafka-server/kafka_scram_jaas.conf |  26 +++
 docs/en/latest/plugins/kafka-logger.md          |   2 +-
 docs/zh/latest/plugins/error-log-logger.md      |   2 +-
 t/plugin/kafka-logger4.t                        | 275 ++++++++++++++++++++++++
 8 files changed, 363 insertions(+), 4 deletions(-)

diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index 75510f52c..ec4f694f6 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -77,7 +77,7 @@ local schema = {
                             mechanism = {
                                 type = "string",
                                 default = "PLAIN",
-                                enum = {"PLAIN"},
+                                enum = {"PLAIN", "SCRAM-SHA-256", 
"SCRAM-SHA-512"},
                             },
                             user = { type = "string", description = "user" },
                             password =  { type = "string", description = 
"password" },
diff --git a/ci/init-plugin-test-service.sh b/ci/init-plugin-test-service.sh
index 2da891e56..7b60607f5 100755
--- a/ci/init-plugin-test-service.sh
+++ b/ci/init-plugin-test-service.sh
@@ -20,7 +20,15 @@ after() {
     docker exec -i apache-apisix-kafka-server1-1 
/opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper 
zookeeper-server1:2181 --replication-factor 1 --partitions 1 --topic test2
     docker exec -i apache-apisix-kafka-server1-1 
/opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper 
zookeeper-server1:2181 --replication-factor 1 --partitions 3 --topic test3
     docker exec -i apache-apisix-kafka-server2-1 
/opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper 
zookeeper-server2:2181 --replication-factor 1 --partitions 1 --topic test4
-
+    docker exec -i apache-apisix-kafka-server3-scram-1 
/opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper 
zookeeper-server3:2181 --replication-factor 1 --partitions 1 --topic 
test-scram-256
+    docker exec -i apache-apisix-kafka-server3-scram-1 
/opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper 
zookeeper-server3:2181 --replication-factor 1 --partitions 1 --topic 
test-scram-512
+    # Create user with SCRAM-SHA-512
+    docker exec apache-apisix-kafka-server3-scram-1 
/opt/bitnami/kafka/bin/kafka-configs.sh \
+        --zookeeper zookeeper-server3:2181 \
+        --alter \
+        --add-config 
'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' \
+        --entity-type users \
+        --entity-name admin
     # prepare openwhisk env
     docker pull openwhisk/action-nodejs-v14:1.20.0
     docker run --rm -d --name openwhisk -p 3233:3233 -p 3232:3232 -v 
/var/run/docker.sock:/var/run/docker.sock openwhisk/standalone:1.0.0
diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml
index 22d0303f9..2c026b76a 100644
--- a/ci/pod/docker-compose.plugin.yml
+++ b/ci/pod/docker-compose.plugin.yml
@@ -80,6 +80,16 @@ services:
     networks:
       kafka_net:
 
+  zookeeper-server3:
+    image: bitnamilegacy/zookeeper:3.6.0
+    env_file:
+      - ci/pod/kafka/zookeeper-server/env/common.env
+    restart: unless-stopped
+    ports:
+      - "12182:12181"
+    networks:
+      kafka_net_2:
+
   kafka-server1:
     image: bitnamilegacy/kafka:2.8.1
     env_file:
@@ -113,6 +123,25 @@ services:
     volumes:
       - 
./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro
 
+  kafka-server3-scram:
+    image: bitnamilegacy/kafka:2.8.1
+    env_file:
+      - ci/pod/kafka/kafka-server/env/common3-scram.env
+    environment:
+      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper-server3:2181
+    restart: unless-stopped
+    ports:
+      - "29092:29092"  # PLAINTEXT for inter-broker communication
+      - "29094:29094"  # SASL_SCRAM for clients
+    depends_on:
+      - zookeeper-server1
+      - zookeeper-server2
+      - zookeeper-server3
+    networks:
+      kafka_net_2:
+    volumes:
+      - 
./ci/pod/kafka/kafka-server/kafka_scram_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro
+
   ## SkyWalking
   skywalking:
     image: apache/skywalking-oap-server:8.7.0-es6
@@ -392,6 +421,7 @@ services:
 networks:
   apisix_net:
   kafka_net:
+  kafka_net_2:
   skywalk_net:
   rocketmq_net:
   opa_net:
diff --git a/ci/pod/kafka/kafka-server/env/common3-scram.env 
b/ci/pod/kafka/kafka-server/env/common3-scram.env
new file mode 100644
index 000000000..945d544b6
--- /dev/null
+++ b/ci/pod/kafka/kafka-server/env/common3-scram.env
@@ -0,0 +1,20 @@
+ALLOW_PLAINTEXT_LISTENER=yes
+KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
+
+# CORRECTED: Use SASL_PLAINTEXT protocol with SCRAM mechanism
+KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:29092,SASL_PLAINTEXT://0.0.0.0:29094
+KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-server3-scram:29092,SASL_PLAINTEXT://127.0.0.1:29094
+
+# SCRAM-specific configuration
+KAFKA_CFG_SASL_ENABLED_MECHANISMS=SCRAM-SHA-256,SCRAM-SHA-512
+KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAINTEXT
+
+# Security protocol for inter-broker communication (since it's a single-node 
cluster)
+KAFKA_CFG_SECURITY_INTER_BROKER_PROTOCOL=PLAINTEXT
+
+# Optional: Explicitly set the security protocol map
+KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
+
+# Other configurations
+KAFKA_CFG_OFFSETS_TOPIC_NUM_PARTITIONS=1
+KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1
diff --git a/ci/pod/kafka/kafka-server/kafka_scram_jaas.conf 
b/ci/pod/kafka/kafka-server/kafka_scram_jaas.conf
new file mode 100644
index 000000000..01c832bd3
--- /dev/null
+++ b/ci/pod/kafka/kafka-server/kafka_scram_jaas.conf
@@ -0,0 +1,26 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+KafkaServer {
+    org.apache.kafka.common.security.scram.ScramLoginModule required
+    username="admin"
+    password="admin-secret";
+    org.apache.kafka.common.security.plain.PlainLoginModule required
+    username="admin"
+    password="admin-secret"
+    user_admin="admin-secret";
+};
diff --git a/docs/en/latest/plugins/kafka-logger.md 
b/docs/en/latest/plugins/kafka-logger.md
index a1a717c5e..de8bddd39 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -42,7 +42,7 @@ It might take some time to receive the log data. It will be 
automatically sent a
 | brokers.host           | string  | True     |                |               
        | The host of Kafka broker, e.g, `192.168.1.1`.                         
                                                                                
                                                                                
                                                                                
                                          |
 | brokers.port           | integer | True     |                |   [0, 65535]  
                |  The port of Kafka broker                                     
                                                                                
                                                                                
                                                                                
                             |
 | brokers.sasl_config    | object  | False    |                |               
                |  The sasl config of Kafka broker                              
                                                                                
                                                                                
                                                                                
                                   |
-| brokers.sasl_config.mechanism  | string  | False    | "PLAIN"          | 
["PLAIN"]           |     The mechaism of sasl config                           
                                                                                
                                                                                
                                                                                
                                  |
+| brokers.sasl_config.mechanism  | string  | False    | "PLAIN"          | 
["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"]           |     The mechaism of 
sasl config                                                                     
                                                                                
                                                                                
                                                                        |
 | brokers.sasl_config.user       | string  | True     |                  |     
                |  The user of sasl_config. If sasl_config exists, it's 
required.                                                                       
                                                                                
                                                                                
                                                      |
 | brokers.sasl_config.password   | string  | True     |                  |     
                | The password of sasl_config. If sasl_config exists, it's 
required.                                                                       
                                                                                
                                                                                
                                                          |
 | kafka_topic            | string  | True     |                |               
        | Target topic to push the logs for organisation.                       
                                                                                
                                                                                
                                                                                
                           |
diff --git a/docs/zh/latest/plugins/error-log-logger.md 
b/docs/zh/latest/plugins/error-log-logger.md
index d9d559031..508955bf3 100644
--- a/docs/zh/latest/plugins/error-log-logger.md
+++ b/docs/zh/latest/plugins/error-log-logger.md
@@ -51,7 +51,7 @@ description: API 网关 Apache APISIX error-log-logger 插件用于将 
APISIX 
 | kafka.brokers.host                  | string  | 是   |                |       
                | Kafka broker 的节点 host 配置,例如 `192.168.1.1`|
 | kafka.brokers.port                  | string  | 是   |                |       
                | Kafka broker 的节点端口配置  |
 | kafka.brokers.sasl_config           | object  | 否   |                |       
                | Kafka broker 中的 sasl_config |
-| kafka.brokers.sasl_config.mechanism | string  | 否   | "PLAIN"          | 
["PLAIN"]   | Kafka broker 中的 sasl 认证机制 |
+| kafka.brokers.sasl_config.mechanism | string  | 否   | "PLAIN"          | 
["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"]  | Kafka broker 中的 sasl 认证机制 |
 | kafka.brokers.sasl_config.user      | string  | 是   |                  |     
        | Kafka broker 中 sasl 配置中的 user,如果 sasl_config 存在,则必须填写 |
 | kafka.brokers.sasl_config.password  | string  | 是   |                  |     
        | Kafka broker 中 sasl 配置中的 password,如果 sasl_config 存在,则必须填写 |
 | kafka.kafka_topic                   | string  | 是   |                |       
                | 需要推送的 Kafka topic。|
diff --git a/t/plugin/kafka-logger4.t b/t/plugin/kafka-logger4.t
new file mode 100644
index 000000000..b4d3e456d
--- /dev/null
+++ b/t/plugin/kafka-logger4.t
@@ -0,0 +1,275 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!$block->request) {
+        $block->set_value("request", "GET /t");
+    }
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: set route with correct sasl_config - SCRAM-SHA-256
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "plugins":{
+                        "kafka-logger":{
+                            "brokers":[
+                            {
+                                "host":"127.0.0.1",
+                                "port":29094,
+                                "sasl_config":{
+                                    "mechanism":"SCRAM-SHA-256",
+                                    "user":"admin",
+                                    "password":"admin-secret"
+                            }
+                        }],
+                            "kafka_topic":"test-scram-256",
+                            "producer_type":"async",
+                            "key":"key1",
+                            "timeout":1,
+                            "batch_max_size":1,
+                            "include_req_body": true
+                        }
+                    },
+                    "upstream":{
+                        "nodes":{
+                            "127.0.0.1:1980":1
+                        },
+                        "type":"roundrobin"
+                    },
+                    "uri":"/hello"
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 2: hit route, send data to kafka successfully
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log eval
+qr/send data to kafka: \{.*"route_id":"1"/
+--- no_error_log
+[error]
+--- wait: 2
+
+
+
+=== TEST 3: set route with incorrect password - SCRAM-SHA-256
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "plugins":{
+                        "kafka-logger":{
+                            "brokers":[
+                            {
+                                "host":"127.0.0.1",
+                                "port":29094,
+                                "sasl_config":{
+                                    "mechanism":"SCRAM-SHA-256",
+                                    "user":"admin",
+                                    "password":"admin-secrets"
+                            }
+                        }],
+                            "kafka_topic":"test-scram-256",
+                            "producer_type":"async",
+                            "key":"key1",
+                            "timeout":1,
+                            "batch_max_size":1,
+                            "include_req_body": true
+                        }
+                    },
+                    "upstream":{
+                        "nodes":{
+                            "127.0.0.1:1980":1
+                        },
+                        "type":"roundrobin"
+                    },
+                    "uri":"/hello"
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 4: hit route, send data to kafka unsuccessfully
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log
+Authentication failed during authentication due to invalid credentials with 
SASL mechanism SCRAM-SHA-256
+--- wait: 2
+
+
+
+=== TEST 5: set route with correct sasl_config - SCRAM-SHA-512
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "plugins":{
+                        "kafka-logger":{
+                            "brokers":[
+                            {
+                                "host":"127.0.0.1",
+                                "port":29094,
+                                "sasl_config":{
+                                    "mechanism":"SCRAM-SHA-512",
+                                    "user":"admin",
+                                    "password":"admin-secret"
+                            }
+                        }],
+                            "kafka_topic":"test-scram-512",
+                            "producer_type":"async",
+                            "key":"key1",
+                            "timeout":1,
+                            "batch_max_size":1,
+                            "include_req_body": true
+                        }
+                    },
+                    "upstream":{
+                        "nodes":{
+                            "127.0.0.1:1980":1
+                        },
+                        "type":"roundrobin"
+                    },
+                    "uri":"/hello"
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 6: hit route, send data to kafka successfully
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log eval
+qr/send data to kafka: \{.*"route_id":"1"/
+--- no_error_log
+[error]
+--- wait: 2
+
+
+
+=== TEST 7: set route with incorrect password - SCRAM-SHA-512
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "plugins":{
+                        "kafka-logger":{
+                            "brokers":[
+                            {
+                                "host":"127.0.0.1",
+                                "port":29094,
+                                "sasl_config":{
+                                    "mechanism":"SCRAM-SHA-512",
+                                    "user":"admin",
+                                    "password":"admin-secrets"
+                            }
+                        }],
+                            "kafka_topic":"test-scram-256",
+                            "producer_type":"async",
+                            "key":"key1",
+                            "timeout":1,
+                            "batch_max_size":1,
+                            "include_req_body": true
+                        }
+                    },
+                    "upstream":{
+                        "nodes":{
+                            "127.0.0.1:1980":1
+                        },
+                        "type":"roundrobin"
+                    },
+                    "uri":"/hello"
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 8: hit route, send data to kafka unsuccessfully
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log
+Authentication failed during authentication due to invalid credentials with 
SASL mechanism SCRAM-SHA-512
+--- wait: 2

Reply via email to