This is an automated email from the ASF dual-hosted git repository.
ashishtiwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new d58681967 feat(kafka-logger): add support for scram for authentication
(#12693)
d58681967 is described below
commit d58681967ef5ed44227d7d15e20934cae3e2e08b
Author: Ashish Tiwari <[email protected]>
AuthorDate: Fri Oct 24 14:11:10 2025 +0530
feat(kafka-logger): add support for scram for authentication (#12693)
---
apisix/plugins/kafka-logger.lua | 2 +-
ci/init-plugin-test-service.sh | 10 +-
ci/pod/docker-compose.plugin.yml | 30 +++
ci/pod/kafka/kafka-server/env/common3-scram.env | 20 ++
ci/pod/kafka/kafka-server/kafka_scram_jaas.conf | 26 +++
docs/en/latest/plugins/kafka-logger.md | 2 +-
docs/zh/latest/plugins/error-log-logger.md | 2 +-
t/plugin/kafka-logger4.t | 275 ++++++++++++++++++++++++
8 files changed, 363 insertions(+), 4 deletions(-)
diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua
index 75510f52c..ec4f694f6 100644
--- a/apisix/plugins/kafka-logger.lua
+++ b/apisix/plugins/kafka-logger.lua
@@ -77,7 +77,7 @@ local schema = {
mechanism = {
type = "string",
default = "PLAIN",
- enum = {"PLAIN"},
+ enum = {"PLAIN", "SCRAM-SHA-256",
"SCRAM-SHA-512"},
},
user = { type = "string", description = "user" },
password = { type = "string", description =
"password" },
diff --git a/ci/init-plugin-test-service.sh b/ci/init-plugin-test-service.sh
index 2da891e56..7b60607f5 100755
--- a/ci/init-plugin-test-service.sh
+++ b/ci/init-plugin-test-service.sh
@@ -20,7 +20,15 @@ after() {
docker exec -i apache-apisix-kafka-server1-1
/opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper
zookeeper-server1:2181 --replication-factor 1 --partitions 1 --topic test2
docker exec -i apache-apisix-kafka-server1-1
/opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper
zookeeper-server1:2181 --replication-factor 1 --partitions 3 --topic test3
docker exec -i apache-apisix-kafka-server2-1
/opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper
zookeeper-server2:2181 --replication-factor 1 --partitions 1 --topic test4
-
+ docker exec -i apache-apisix-kafka-server3-scram-1
/opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper
zookeeper-server3:2181 --replication-factor 1 --partitions 1 --topic
test-scram-256
+ docker exec -i apache-apisix-kafka-server3-scram-1
/opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper
zookeeper-server3:2181 --replication-factor 1 --partitions 1 --topic
test-scram-512
+ # Create user with SCRAM-SHA-512
+ docker exec apache-apisix-kafka-server3-scram-1
/opt/bitnami/kafka/bin/kafka-configs.sh \
+ --zookeeper zookeeper-server3:2181 \
+ --alter \
+ --add-config
'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' \
+ --entity-type users \
+ --entity-name admin
# prepare openwhisk env
docker pull openwhisk/action-nodejs-v14:1.20.0
docker run --rm -d --name openwhisk -p 3233:3233 -p 3232:3232 -v
/var/run/docker.sock:/var/run/docker.sock openwhisk/standalone:1.0.0
diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml
index 22d0303f9..2c026b76a 100644
--- a/ci/pod/docker-compose.plugin.yml
+++ b/ci/pod/docker-compose.plugin.yml
@@ -80,6 +80,16 @@ services:
networks:
kafka_net:
+ zookeeper-server3:
+ image: bitnamilegacy/zookeeper:3.6.0
+ env_file:
+ - ci/pod/kafka/zookeeper-server/env/common.env
+ restart: unless-stopped
+ ports:
+ - "12182:12181"
+ networks:
+ kafka_net_2:
+
kafka-server1:
image: bitnamilegacy/kafka:2.8.1
env_file:
@@ -113,6 +123,25 @@ services:
volumes:
-
./ci/pod/kafka/kafka-server/kafka_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro
+ kafka-server3-scram:
+ image: bitnamilegacy/kafka:2.8.1
+ env_file:
+ - ci/pod/kafka/kafka-server/env/common3-scram.env
+ environment:
+ KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper-server3:2181
+ restart: unless-stopped
+ ports:
+ - "29092:29092" # PLAINTEXT for inter-broker communication
+ - "29094:29094" # SASL_SCRAM for clients
+ depends_on:
+ - zookeeper-server1
+ - zookeeper-server2
+ - zookeeper-server3
+ networks:
+ kafka_net_2:
+ volumes:
+ -
./ci/pod/kafka/kafka-server/kafka_scram_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro
+
## SkyWalking
skywalking:
image: apache/skywalking-oap-server:8.7.0-es6
@@ -392,6 +421,7 @@ services:
networks:
apisix_net:
kafka_net:
+ kafka_net_2:
skywalk_net:
rocketmq_net:
opa_net:
diff --git a/ci/pod/kafka/kafka-server/env/common3-scram.env
b/ci/pod/kafka/kafka-server/env/common3-scram.env
new file mode 100644
index 000000000..945d544b6
--- /dev/null
+++ b/ci/pod/kafka/kafka-server/env/common3-scram.env
@@ -0,0 +1,20 @@
+ALLOW_PLAINTEXT_LISTENER=yes
+KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
+
+# CORRECTED: Use SASL_PLAINTEXT protocol with SCRAM mechanism
+KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:29092,SASL_PLAINTEXT://0.0.0.0:29094
+KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-server3-scram:29092,SASL_PLAINTEXT://127.0.0.1:29094
+
+# SCRAM-specific configuration
+KAFKA_CFG_SASL_ENABLED_MECHANISMS=SCRAM-SHA-256,SCRAM-SHA-512
+KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAINTEXT
+
+# Security protocol for inter-broker communication (since it's a single-node
cluster)
+KAFKA_CFG_SECURITY_INTER_BROKER_PROTOCOL=PLAINTEXT
+
+# Optional: Explicitly set the security protocol map
+KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT
+
+# Other configurations
+KAFKA_CFG_OFFSETS_TOPIC_NUM_PARTITIONS=1
+KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1
diff --git a/ci/pod/kafka/kafka-server/kafka_scram_jaas.conf
b/ci/pod/kafka/kafka-server/kafka_scram_jaas.conf
new file mode 100644
index 000000000..01c832bd3
--- /dev/null
+++ b/ci/pod/kafka/kafka-server/kafka_scram_jaas.conf
@@ -0,0 +1,26 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+KafkaServer {
+ org.apache.kafka.common.security.scram.ScramLoginModule required
+ username="admin"
+ password="admin-secret";
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="admin"
+ password="admin-secret"
+ user_admin="admin-secret";
+};
diff --git a/docs/en/latest/plugins/kafka-logger.md
b/docs/en/latest/plugins/kafka-logger.md
index a1a717c5e..de8bddd39 100644
--- a/docs/en/latest/plugins/kafka-logger.md
+++ b/docs/en/latest/plugins/kafka-logger.md
@@ -42,7 +42,7 @@ It might take some time to receive the log data. It will be
automatically sent a
| brokers.host | string | True | |
| The host of Kafka broker, e.g, `192.168.1.1`.
|
| brokers.port | integer | True | | [0, 65535]
| The port of Kafka broker
|
| brokers.sasl_config | object | False | |
| The sasl config of Kafka broker
|
-| brokers.sasl_config.mechanism | string | False | "PLAIN" |
["PLAIN"] | The mechaism of sasl config
|
+| brokers.sasl_config.mechanism | string | False | "PLAIN" |
["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"] | The mechaism of
sasl config
|
| brokers.sasl_config.user | string | True | |
| The user of sasl_config. If sasl_config exists, it's
required.
|
| brokers.sasl_config.password | string | True | |
| The password of sasl_config. If sasl_config exists, it's
required.
|
| kafka_topic | string | True | |
| Target topic to push the logs for organisation.
|
diff --git a/docs/zh/latest/plugins/error-log-logger.md
b/docs/zh/latest/plugins/error-log-logger.md
index d9d559031..508955bf3 100644
--- a/docs/zh/latest/plugins/error-log-logger.md
+++ b/docs/zh/latest/plugins/error-log-logger.md
@@ -51,7 +51,7 @@ description: API 网关 Apache APISIX error-log-logger 插件用于将
APISIX
| kafka.brokers.host | string | 是 | |
| Kafka broker 的节点 host 配置,例如 `192.168.1.1`|
| kafka.brokers.port | string | 是 | |
| Kafka broker 的节点端口配置 |
| kafka.brokers.sasl_config | object | 否 | |
| Kafka broker 中的 sasl_config |
-| kafka.brokers.sasl_config.mechanism | string | 否 | "PLAIN" |
["PLAIN"] | Kafka broker 中的 sasl 认证机制 |
+| kafka.brokers.sasl_config.mechanism | string | 否 | "PLAIN" |
["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"] | Kafka broker 中的 sasl 认证机制 |
| kafka.brokers.sasl_config.user | string | 是 | |
| Kafka broker 中 sasl 配置中的 user,如果 sasl_config 存在,则必须填写 |
| kafka.brokers.sasl_config.password | string | 是 | |
| Kafka broker 中 sasl 配置中的 password,如果 sasl_config 存在,则必须填写 |
| kafka.kafka_topic | string | 是 | |
| 需要推送的 Kafka topic。|
diff --git a/t/plugin/kafka-logger4.t b/t/plugin/kafka-logger4.t
new file mode 100644
index 000000000..b4d3e456d
--- /dev/null
+++ b/t/plugin/kafka-logger4.t
@@ -0,0 +1,275 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+use t::APISIX 'no_plan';
+
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ if (!$block->request) {
+ $block->set_value("request", "GET /t");
+ }
+});
+
+run_tests;
+
+__DATA__
+
+=== TEST 1: set route with correct sasl_config - SCRAM-SHA-256
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins":{
+ "kafka-logger":{
+ "brokers":[
+ {
+ "host":"127.0.0.1",
+ "port":29094,
+ "sasl_config":{
+ "mechanism":"SCRAM-SHA-256",
+ "user":"admin",
+ "password":"admin-secret"
+ }
+ }],
+ "kafka_topic":"test-scram-256",
+ "producer_type":"async",
+ "key":"key1",
+ "timeout":1,
+ "batch_max_size":1,
+ "include_req_body": true
+ }
+ },
+ "upstream":{
+ "nodes":{
+ "127.0.0.1:1980":1
+ },
+ "type":"roundrobin"
+ },
+ "uri":"/hello"
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 2: hit route, send data to kafka successfully
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log eval
+qr/send data to kafka: \{.*"route_id":"1"/
+--- no_error_log
+[error]
+--- wait: 2
+
+
+
+=== TEST 3: set route with incorrect password - SCRAM-SHA-256
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins":{
+ "kafka-logger":{
+ "brokers":[
+ {
+ "host":"127.0.0.1",
+ "port":29094,
+ "sasl_config":{
+ "mechanism":"SCRAM-SHA-256",
+ "user":"admin",
+ "password":"admin-secrets"
+ }
+ }],
+ "kafka_topic":"test-scram-256",
+ "producer_type":"async",
+ "key":"key1",
+ "timeout":1,
+ "batch_max_size":1,
+ "include_req_body": true
+ }
+ },
+ "upstream":{
+ "nodes":{
+ "127.0.0.1:1980":1
+ },
+ "type":"roundrobin"
+ },
+ "uri":"/hello"
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 4: hit route, send data to kafka unsuccessfully
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log
+Authentication failed during authentication due to invalid credentials with
SASL mechanism SCRAM-SHA-256
+--- wait: 2
+
+
+
+=== TEST 5: set route with correct sasl_config - SCRAM-SHA-512
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins":{
+ "kafka-logger":{
+ "brokers":[
+ {
+ "host":"127.0.0.1",
+ "port":29094,
+ "sasl_config":{
+ "mechanism":"SCRAM-SHA-512",
+ "user":"admin",
+ "password":"admin-secret"
+ }
+ }],
+ "kafka_topic":"test-scram-512",
+ "producer_type":"async",
+ "key":"key1",
+ "timeout":1,
+ "batch_max_size":1,
+ "include_req_body": true
+ }
+ },
+ "upstream":{
+ "nodes":{
+ "127.0.0.1:1980":1
+ },
+ "type":"roundrobin"
+ },
+ "uri":"/hello"
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 6: hit route, send data to kafka successfully
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log eval
+qr/send data to kafka: \{.*"route_id":"1"/
+--- no_error_log
+[error]
+--- wait: 2
+
+
+
+=== TEST 7: set route with incorrect password - SCRAM-SHA-512
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "plugins":{
+ "kafka-logger":{
+ "brokers":[
+ {
+ "host":"127.0.0.1",
+ "port":29094,
+ "sasl_config":{
+ "mechanism":"SCRAM-SHA-512",
+ "user":"admin",
+ "password":"admin-secrets"
+ }
+ }],
+ "kafka_topic":"test-scram-256",
+ "producer_type":"async",
+ "key":"key1",
+ "timeout":1,
+ "batch_max_size":1,
+ "include_req_body": true
+ }
+ },
+ "upstream":{
+ "nodes":{
+ "127.0.0.1:1980":1
+ },
+ "type":"roundrobin"
+ },
+ "uri":"/hello"
+ }]]
+ )
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 8: hit route, send data to kafka unsuccessfully
+--- request
+POST /hello?name=qwerty
+abcdef
+--- response_body
+hello world
+--- error_log
+Authentication failed during authentication due to invalid credentials with
SASL mechanism SCRAM-SHA-512
+--- wait: 2