spacewander commented on code in PR #6995: URL: https://github.com/apache/apisix/pull/6995#discussion_r869925340
########## apisix/core/pubsub.lua: ########## @@ -0,0 +1,167 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +--- Extensible framework to support publish-and-subscribe scenarios +-- +-- @module core.pubsub + +local log = require("apisix.core.log") +local ws_server = require("resty.websocket.server") +local protoc = require("protoc") +local pb = require("pb") +local setmetatable = setmetatable +local pcall = pcall +local pairs = pairs + +protoc.reload() +pb.option("int64_as_string") +local pubsub_protoc = protoc.new() + + +local _M = { version = 0.1 } +local mt = { __index = _M } + + +--- +-- Create pubsub module instance +-- +-- @function core.pubsub.new +-- @treturn pubsub module instance +-- @treturn string|nil error message if present +-- @usage +-- local pubsub, err = core.pubsub.new() +function _M.new() + -- compile the protobuf file on initial load module + -- ensure that each worker is loaded once + if not pubsub_protoc.loaded["pubsub.proto"] then + pubsub_protoc:addpath("apisix/include/apisix/model") + local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto") + if not ok then + pubsub_protoc:reset() + return nil, "failed to load pubsub protocol: "..err + end + end + + local ws, err = ws_server:new() + if not ws then + return nil, err + end + + local obj = setmetatable({ + ws_server = ws, + cmd_handler = {}, + }, mt) + + return obj +end + + +--- +-- Add command callbacks to pubsub module instances +-- +-- The callback function prototype: function (params) +-- The params in the parameters contain the data defined in the requested command. +-- Its first return value is the data, which needs to contain the data needed for +-- the particular resp, returns nil if an error exists. +-- Its second return value is a string type error message, no need to return when +-- no error exists. +-- +-- @function core.pubsub.on +-- @usage +-- pubsub:on(command, function (params) +-- return data, err +-- end) +function _M.on(self, command, handler) + self.cmd_handler[command] = handler +end + + +--- +-- Put the pubsub instance into an event loop, waiting to process client commands +-- +-- @function core.pubsub.wait +-- @treturn string|nil error message if present, will terminate the event loop +-- @usage +-- local err = pubsub:wait() +function _M.wait(self) + local ws = self.ws_server + while true do + -- read raw data frames from websocket connection + local raw_data, raw_type, err = ws:recv_frame() + if err then + -- terminate the event loop when a fatal error occurs + if ws.fatal then + ws:send_close() + return "websocket server: "..err + end + + -- skip this loop for non-fatal errors + log.error("failed to receive websocket frame: "..err) Review Comment: ```suggestion log.error("failed to receive websocket frame: ", err) ``` ########## apisix/core/pubsub.lua: ########## @@ -0,0 +1,167 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +--- Extensible framework to support publish-and-subscribe scenarios +-- +-- @module core.pubsub + +local log = require("apisix.core.log") +local ws_server = require("resty.websocket.server") +local protoc = require("protoc") +local pb = require("pb") +local setmetatable = setmetatable +local pcall = pcall +local pairs = pairs + +protoc.reload() +pb.option("int64_as_string") +local pubsub_protoc = protoc.new() + + +local _M = { version = 0.1 } +local mt = { __index = _M } + + +--- +-- Create pubsub module instance +-- +-- @function core.pubsub.new +-- @treturn pubsub module instance +-- @treturn string|nil error message if present +-- @usage +-- local pubsub, err = core.pubsub.new() +function _M.new() + -- compile the protobuf file on initial load module + -- ensure that each worker is loaded once + if not pubsub_protoc.loaded["pubsub.proto"] then + pubsub_protoc:addpath("apisix/include/apisix/model") + local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto") + if not ok then + pubsub_protoc:reset() + return nil, "failed to load pubsub protocol: "..err + end + end + + local ws, err = ws_server:new() + if not ws then + return nil, err + end + + local obj = setmetatable({ + ws_server = ws, + cmd_handler = {}, + }, mt) + + return obj +end + + +--- +-- Add command callbacks to pubsub module instances +-- +-- The callback function prototype: function (params) +-- The params in the parameters contain the data defined in the requested command. +-- Its first return value is the data, which needs to contain the data needed for +-- the particular resp, returns nil if an error exists. +-- Its second return value is a string type error message, no need to return when +-- no error exists. +-- +-- @function core.pubsub.on Review Comment: Let's add some `@tparam` ########## apisix/core/pubsub.lua: ########## @@ -0,0 +1,167 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +--- Extensible framework to support publish-and-subscribe scenarios +-- +-- @module core.pubsub + +local log = require("apisix.core.log") +local ws_server = require("resty.websocket.server") +local protoc = require("protoc") +local pb = require("pb") +local setmetatable = setmetatable +local pcall = pcall +local pairs = pairs + +protoc.reload() +pb.option("int64_as_string") +local pubsub_protoc = protoc.new() + + +local _M = { version = 0.1 } +local mt = { __index = _M } + + +--- +-- Create pubsub module instance +-- +-- @function core.pubsub.new +-- @treturn pubsub module instance +-- @treturn string|nil error message if present +-- @usage +-- local pubsub, err = core.pubsub.new() +function _M.new() + -- compile the protobuf file on initial load module + -- ensure that each worker is loaded once + if not pubsub_protoc.loaded["pubsub.proto"] then + pubsub_protoc:addpath("apisix/include/apisix/model") + local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto") + if not ok then + pubsub_protoc:reset() + return nil, "failed to load pubsub protocol: "..err + end + end + + local ws, err = ws_server:new() + if not ws then + return nil, err + end + + local obj = setmetatable({ + ws_server = ws, + cmd_handler = {}, + }, mt) + + return obj +end + + +--- +-- Add command callbacks to pubsub module instances +-- +-- The callback function prototype: function (params) +-- The params in the parameters contain the data defined in the requested command. +-- Its first return value is the data, which needs to contain the data needed for +-- the particular resp, returns nil if an error exists. +-- Its second return value is a string type error message, no need to return when +-- no error exists. +-- +-- @function core.pubsub.on +-- @usage +-- pubsub:on(command, function (params) +-- return data, err +-- end) +function _M.on(self, command, handler) + self.cmd_handler[command] = handler +end + + +--- +-- Put the pubsub instance into an event loop, waiting to process client commands +-- +-- @function core.pubsub.wait +-- @treturn string|nil error message if present, will terminate the event loop +-- @usage +-- local err = pubsub:wait() +function _M.wait(self) + local ws = self.ws_server + while true do + -- read raw data frames from websocket connection + local raw_data, raw_type, err = ws:recv_frame() + if err then + -- terminate the event loop when a fatal error occurs + if ws.fatal then + ws:send_close() + return "websocket server: "..err Review Comment: Better to use break in the while loop and handle the error in the same place ########## apisix/core/pubsub.lua: ########## @@ -0,0 +1,167 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +--- Extensible framework to support publish-and-subscribe scenarios +-- +-- @module core.pubsub + +local log = require("apisix.core.log") +local ws_server = require("resty.websocket.server") +local protoc = require("protoc") +local pb = require("pb") +local setmetatable = setmetatable +local pcall = pcall +local pairs = pairs + +protoc.reload() +pb.option("int64_as_string") +local pubsub_protoc = protoc.new() + + +local _M = { version = 0.1 } +local mt = { __index = _M } + + +--- +-- Create pubsub module instance +-- +-- @function core.pubsub.new +-- @treturn pubsub module instance +-- @treturn string|nil error message if present +-- @usage +-- local pubsub, err = core.pubsub.new() +function _M.new() + -- compile the protobuf file on initial load module + -- ensure that each worker is loaded once + if not pubsub_protoc.loaded["pubsub.proto"] then + pubsub_protoc:addpath("apisix/include/apisix/model") + local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto") + if not ok then + pubsub_protoc:reset() + return nil, "failed to load pubsub protocol: "..err + end + end + + local ws, err = ws_server:new() + if not ws then + return nil, err + end + + local obj = setmetatable({ + ws_server = ws, + cmd_handler = {}, + }, mt) + + return obj +end + + +--- +-- Add command callbacks to pubsub module instances +-- +-- The callback function prototype: function (params) +-- The params in the parameters contain the data defined in the requested command. +-- Its first return value is the data, which needs to contain the data needed for +-- the particular resp, returns nil if an error exists. +-- Its second return value is a string type error message, no need to return when +-- no error exists. +-- +-- @function core.pubsub.on +-- @usage +-- pubsub:on(command, function (params) +-- return data, err +-- end) +function _M.on(self, command, handler) + self.cmd_handler[command] = handler +end + + +--- +-- Put the pubsub instance into an event loop, waiting to process client commands +-- +-- @function core.pubsub.wait +-- @treturn string|nil error message if present, will terminate the event loop +-- @usage +-- local err = pubsub:wait() +function _M.wait(self) + local ws = self.ws_server + while true do + -- read raw data frames from websocket connection + local raw_data, raw_type, err = ws:recv_frame() + if err then + -- terminate the event loop when a fatal error occurs + if ws.fatal then + ws:send_close() + return "websocket server: "..err + end + + -- skip this loop for non-fatal errors + log.error("failed to receive websocket frame: "..err) + goto continue + end + + -- handle client close connection + if raw_type == "close" then + ws:send_close() + return + end + + -- the pub-sub messages use binary, if the message is not + -- binary, skip this message + if raw_type ~= "binary" then + goto continue Review Comment: Better to add warn log for unexpected input ########## t/pubsub/kafka.t: ########## @@ -0,0 +1,418 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if ((!defined $block->error_log) && (!defined $block->no_error_log)) { + $block->set_value("no_error_log", "[error]"); + } + + if (!defined $block->request) { + $block->set_value("request", "GET /t"); + } +}); + +run_tests(); + +__DATA__ + +=== TEST 1: setup all-in-one test +--- config + location /t { + content_by_lua_block { + local data = { + { + url = "/apisix/admin/routes/kafka", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:9092": 1 + }, + "type": "none", + "scheme": "kafka" + }, + "uri": "/kafka" + }]], + }, + { + url = "/apisix/admin/routes/kafka-tlsv", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:9093": 1 + }, + "type": "none", + "scheme": "kafka", + "tls": { + "verify": true + } + }, + "uri": "/kafka-tlsv" + }]], + }, + { + url = "/apisix/admin/routes/kafka-tls", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:9093": 1 + }, + "type": "none", + "scheme": "kafka", + "tls": { + "verify": false + } + }, + "uri": "/kafka-tls" + }]], + }, + { + url = "/apisix/admin/routes/kafka-sasl", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:9094": 1 + }, + "type": "none", + "scheme": "kafka" + }, + "uri": "/kafka-sasl", + "plugins": { + "kafka-proxy": { + "enable_sasl": true, + "sasl": { + "username": "admin", + "password": "admin-secret" + } + } + } + }]], + } + } + + local t = require("lib.test_admin").test + + for _, data in ipairs(data) do + local code, body = t(data.url, ngx.HTTP_PUT, data.data) + ngx.say(code..body) + end + } + } +--- response_body eval +"201passed\n"x4 + + + +=== TEST 2: hit route (with HTTP request) +--- request +GET /kafka +--- error_code: 400 +--- error_log +failed to initialize pub-sub module, err: bad "upgrade" request header: nil + + + +=== TEST 3: hit route (normal Kafka) +--- config + location /t { + content_by_lua_block { + local protoc = require("protoc") + local pb = require("pb") + protoc.reload() + pb.option("int64_as_string") + local pubsub_protoc = protoc.new() + pubsub_protoc:addpath("apisix/include/apisix/model") + local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto") Review Comment: Bad indent? ########## t/admin/upstream5.t: ########## @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); +no_shuffle(); +log_level("info"); + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->request) { + $block->set_value("request", "GET /t"); + } + + if (!$block->no_error_log && !$block->error_log) { + $block->set_value("no_error_log", "[error]\n[alert]"); + } +}); + +run_tests; + +__DATA__ + +=== TEST 1: set upstream(kafka scheme) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin") + local code, body = t.test("/apisix/admin/upstreams/kafka", ngx.HTTP_PUT, [[{ + "nodes": { + "127.0.0.1:9092": 1 + }, + "type": "none", + "scheme": "kafka" + }]]) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- error_code: 200 Review Comment: We don't need to check the default error_code ########## docs/en/latest/plugins/kafka-proxy.md: ########## @@ -0,0 +1,81 @@ +--- +title: kafka-proxy +keywords: + - APISIX + - Plugin + - Kafka proxy +description: This document contains information about the Apache APISIX kafka-proxy Plugin. +--- + +<!-- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +--> + +## Description + +The `kafka-proxy` plugin can be used to configure advanced parameters for the kafka upstream of Apache APISIX, such as SASL authentication. + +## Attributes + +| Name | Type | Required | Default | Valid values | Description | +|-------------------|---------|----------|---------|---------------|------------------------------------| +| enable_sasl | boolean | False | false | | Enable SASL authentication | +| sasl_username | string | False | "" | | SASL/PLAIN authentication username | +| sasl_password | string | False | "" | | SASL/PLAIN authentication password | + +:::note +If SASL authentication is enabled, the `sasl_username` and `sasl_password` must be set. +The current SASL authentication only supports PLAIN mode, which is the username password login method. +::: + +## Example usage + +When we use scheme as the upstream of kafka, we can add kafka authentication configuration to it through this plugin. + +```shell +curl -X PUT 'http://127.0.0.1:9080/apisix/admin/routes/r1' \ + -H 'X-API-KEY: <api-key>' \ + -H 'Content-Type: application/json' \ + -d '{ + "uri": "/kafka", + "plugins": { + "kafka-proxy": { + "enable_tls": true, + "ssl_verify": true, + "enable_sasl": true, + "sasl_username": "user", Review Comment: Please update the example when the code part is accepted. ########## t/pubsub/kafka.t: ########## @@ -0,0 +1,418 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if ((!defined $block->error_log) && (!defined $block->no_error_log)) { + $block->set_value("no_error_log", "[error]"); + } + + if (!defined $block->request) { + $block->set_value("request", "GET /t"); + } +}); + +run_tests(); + +__DATA__ + +=== TEST 1: setup all-in-one test +--- config + location /t { + content_by_lua_block { + local data = { + { + url = "/apisix/admin/routes/kafka", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:9092": 1 + }, + "type": "none", + "scheme": "kafka" + }, + "uri": "/kafka" + }]], + }, + { + url = "/apisix/admin/routes/kafka-tlsv", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:9093": 1 + }, + "type": "none", + "scheme": "kafka", + "tls": { + "verify": true + } + }, + "uri": "/kafka-tlsv" + }]], + }, + { + url = "/apisix/admin/routes/kafka-tls", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:9093": 1 + }, + "type": "none", + "scheme": "kafka", + "tls": { + "verify": false + } + }, + "uri": "/kafka-tls" + }]], + }, + { + url = "/apisix/admin/routes/kafka-sasl", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:9094": 1 + }, + "type": "none", + "scheme": "kafka" + }, + "uri": "/kafka-sasl", + "plugins": { + "kafka-proxy": { + "enable_sasl": true, + "sasl": { + "username": "admin", + "password": "admin-secret" + } + } + } + }]], + } + } + + local t = require("lib.test_admin").test + + for _, data in ipairs(data) do + local code, body = t(data.url, ngx.HTTP_PUT, data.data) + ngx.say(code..body) + end + } + } +--- response_body eval +"201passed\n"x4 + + + +=== TEST 2: hit route (with HTTP request) +--- request +GET /kafka +--- error_code: 400 +--- error_log +failed to initialize pub-sub module, err: bad "upgrade" request header: nil + + + +=== TEST 3: hit route (normal Kafka) +--- config + location /t { + content_by_lua_block { + local protoc = require("protoc") + local pb = require("pb") + protoc.reload() + pb.option("int64_as_string") + local pubsub_protoc = protoc.new() + pubsub_protoc:addpath("apisix/include/apisix/model") + local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto") + if not ok then + ngx.say("failed to load protocol: " .. err) + return + end + + local client = require "resty.websocket.client" + local ws, err = client:new() + local ok, err = ws:connect("ws://127.0.0.1:1984/kafka") + if not ok then + ngx.say("failed to connect: " .. err) + return + end + + local data = { + { + sequence = 0, + cmd_kafka_list_offset = { + topic = "not-exist", + partition = 0, + timestamp = -1, + }, + }, + { + sequence = 1, + cmd_kafka_fetch = { + topic = "not-exist", + partition = 0, + offset = 0, + }, + }, + { + sequence = 2, + cmd_kafka_list_offset = { + topic = "test-consumer", + partition = 0, + timestamp = -2, + }, + }, + { + sequence = 3, + cmd_kafka_list_offset = { + topic = "test-consumer", + partition = 0, + timestamp = -1, + }, + }, + { + sequence = 4, + cmd_kafka_fetch = { + topic = "test-consumer", + partition = 0, + offset = 14, + }, + } + } + + for i = 1, #data do + local _, err = ws:send_binary(pb.encode("PubSubReq", data[i])) + local raw_data, raw_type, err = ws:recv_frame() + if not raw_data then + ngx.say("failed to receive the frame: ", err) + return + end + local data, err = pb.decode("PubSubResp", raw_data) + if not data then + ngx.say("failed to decode the frame: ", err) + return + end + + if data.error_resp then + ngx.say(data.sequence..data.error_resp.message) + end + if data.kafka_list_offset_resp then + ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset) + end + if data.kafka_fetch_resp then + ngx.say(data.sequence.."offset: "..data.kafka_fetch_resp.messages[1].offset.. + " msg: "..data.kafka_fetch_resp.messages[1].value) + end + end + + ws:send_close() + } + } +--- response_body +0failed to list offset, topic: not-exist, partition: 0, err: not found topic +1failed to fetch message, topic: not-exist, partition: 0, err: not found topic +2offset: 0 +3offset: 30 +4offset: 14 msg: testmsg15 + + + +=== TEST 4: hit route (Kafka with TLS) +--- config + location /t { + content_by_lua_block { + local protoc = require("protoc") Review Comment: We can exact the common part into helper, and put it in https://github.com/apache/apisix/tree/master/t/lib? ########## apisix/plugins/kafka-proxy.lua: ########## @@ -0,0 +1,71 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") + + +local schema = { + type = "object", + properties = { + enable_sasl = { + type = "boolean", + default = false, + }, + sasl = { Review Comment: Look like we can remove `enable_sasl` because when sasl is enabled, the username / password are required. So we can use `sasl` field to represent `enable_sasl` ########## apisix/schema_def.lua: ########## @@ -406,8 +406,17 @@ local upstream_schema = { properties = { client_cert = certificate_scheme, client_key = private_key_schema, + verify = { + type = "boolean", + description = "Turn on server certificate verification, ".. + "currently only kafka upstream is supported", + default = false, + }, }, - required = {"client_cert", "client_key"}, + anyOf = { + {required = {"client_cert", "client_key"}}, + {required = {"verify"}}, Review Comment: We don't need to require verify as it always has a default value ########## apisix/pubsub/kafka.lua: ########## @@ -0,0 +1,137 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +local core = require("apisix.core") +local bconsumer = require("resty.kafka.basic-consumer") +local ffi = require("ffi") +local C = ffi.C +local tostring = tostring +local type = type +local ipairs = ipairs +local str_sub = string.sub + +ffi.cdef[[ + int64_t atoll(const char *num); +]] + + +local _M = {} + +-- Takes over requests of type kafka upstream in the http_access phase. +function _M.access(api_ctx) + local pubsub, err = core.pubsub.new() + if not pubsub then + core.log.error("failed to initialize pub-sub module, err: ", err) + core.response.exit(400) + return + end + + local up_nodes = api_ctx.matched_upstream.nodes + + -- kafka client broker-related configuration + local broker_list = {} + for i, node in ipairs(up_nodes) do + broker_list[i] = { + host = node.host, + port = node.port, + } + + if api_ctx.kafka_consumer_enable_sasl then + broker_list[i].sasl_config = { + mechanism = "PLAIN", + user = api_ctx.kafka_consumer_sasl_username, + password = api_ctx.kafka_consumer_sasl_password, + } + end + end + + -- kafka client socket-related configuration (TLS, ssl verify) + local client_config = {refresh_interval = 30 * 60 * 1000} + if api_ctx.matched_upstream.tls then + client_config = { + ssl = true, + ssl_verify = api_ctx.matched_upstream.tls.verify, + } + end + + -- load and create the consumer instance when it is determined + -- that the websocket connection was created successfully + local consumer = bconsumer:new(broker_list, client_config) + + pubsub:on("cmd_kafka_list_offset", function (params) + -- The timestamp parameter uses a 64-bit integer, which is difficult + -- for luajit to handle well, so the int64_as_string option in + -- lua-protobuf is used here. Smaller numbers will be decoded as + -- lua number, while overly larger numbers will be decoded as strings + -- in the format #number, where the # symbol at the beginning of the + -- string will be removed and converted to int64_t with the atoll function. + local timestamp = type(params.timestamp) == "string" and + C.atoll(str_sub(params.timestamp, 2, #params.timestamp)) or params.timestamp Review Comment: We can use this skill to avoid sub str: https://github.com/apache/apisix/blob/bf3e23274b6f4a1d67d45da9901e928a0784e570/apisix/core/string.lua#L81 ########## apisix/pubsub/kafka.lua: ########## @@ -0,0 +1,137 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +local core = require("apisix.core") +local bconsumer = require("resty.kafka.basic-consumer") +local ffi = require("ffi") +local C = ffi.C +local tostring = tostring +local type = type +local ipairs = ipairs +local str_sub = string.sub + +ffi.cdef[[ + int64_t atoll(const char *num); +]] + + +local _M = {} + +-- Takes over requests of type kafka upstream in the http_access phase. +function _M.access(api_ctx) + local pubsub, err = core.pubsub.new() + if not pubsub then + core.log.error("failed to initialize pub-sub module, err: ", err) + core.response.exit(400) + return + end + + local up_nodes = api_ctx.matched_upstream.nodes + + -- kafka client broker-related configuration + local broker_list = {} + for i, node in ipairs(up_nodes) do + broker_list[i] = { + host = node.host, + port = node.port, + } + + if api_ctx.kafka_consumer_enable_sasl then + broker_list[i].sasl_config = { + mechanism = "PLAIN", + user = api_ctx.kafka_consumer_sasl_username, + password = api_ctx.kafka_consumer_sasl_password, + } + end + end + + -- kafka client socket-related configuration (TLS, ssl verify) + local client_config = {refresh_interval = 30 * 60 * 1000} + if api_ctx.matched_upstream.tls then + client_config = { Review Comment: We should merge into the default one? ########## apisix/core/pubsub.lua: ########## @@ -0,0 +1,167 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +--- Extensible framework to support publish-and-subscribe scenarios +-- +-- @module core.pubsub + +local log = require("apisix.core.log") +local ws_server = require("resty.websocket.server") +local protoc = require("protoc") +local pb = require("pb") +local setmetatable = setmetatable +local pcall = pcall +local pairs = pairs + +protoc.reload() +pb.option("int64_as_string") +local pubsub_protoc = protoc.new() + + +local _M = { version = 0.1 } +local mt = { __index = _M } + + +--- +-- Create pubsub module instance +-- +-- @function core.pubsub.new +-- @treturn pubsub module instance +-- @treturn string|nil error message if present +-- @usage +-- local pubsub, err = core.pubsub.new() +function _M.new() + -- compile the protobuf file on initial load module + -- ensure that each worker is loaded once + if not pubsub_protoc.loaded["pubsub.proto"] then + pubsub_protoc:addpath("apisix/include/apisix/model") + local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto") + if not ok then + pubsub_protoc:reset() + return nil, "failed to load pubsub protocol: "..err + end + end + + local ws, err = ws_server:new() + if not ws then + return nil, err + end + + local obj = setmetatable({ + ws_server = ws, + cmd_handler = {}, + }, mt) + + return obj +end + + +--- +-- Add command callbacks to pubsub module instances +-- +-- The callback function prototype: function (params) +-- The params in the parameters contain the data defined in the requested command. +-- Its first return value is the data, which needs to contain the data needed for +-- the particular resp, returns nil if an error exists. +-- Its second return value is a string type error message, no need to return when +-- no error exists. +-- +-- @function core.pubsub.on +-- @usage +-- pubsub:on(command, function (params) +-- return data, err +-- end) +function _M.on(self, command, handler) + self.cmd_handler[command] = handler +end + + +--- +-- Put the pubsub instance into an event loop, waiting to process client commands +-- +-- @function core.pubsub.wait +-- @treturn string|nil error message if present, will terminate the event loop +-- @usage +-- local err = pubsub:wait() +function _M.wait(self) + local ws = self.ws_server + while true do + -- read raw data frames from websocket connection + local raw_data, raw_type, err = ws:recv_frame() + if err then + -- terminate the event loop when a fatal error occurs + if ws.fatal then + ws:send_close() + return "websocket server: "..err + end + + -- skip this loop for non-fatal errors + log.error("failed to receive websocket frame: "..err) + goto continue + end + + -- handle client close connection + if raw_type == "close" then + ws:send_close() + return + end + + -- the pub-sub messages use binary, if the message is not + -- binary, skip this message + if raw_type ~= "binary" then + goto continue + end + + local data = pb.decode("PubSubReq", raw_data) Review Comment: Should we check decode error? ########## t/pubsub/kafka.t: ########## @@ -0,0 +1,418 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if ((!defined $block->error_log) && (!defined $block->no_error_log)) { + $block->set_value("no_error_log", "[error]"); + } + + if (!defined $block->request) { + $block->set_value("request", "GET /t"); + } +}); + +run_tests(); + +__DATA__ + +=== TEST 1: setup all-in-one test +--- config + location /t { + content_by_lua_block { + local data = { + { + url = "/apisix/admin/routes/kafka", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:9092": 1 + }, + "type": "none", + "scheme": "kafka" + }, + "uri": "/kafka" + }]], + }, + { + url = "/apisix/admin/routes/kafka-tlsv", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:9093": 1 + }, + "type": "none", + "scheme": "kafka", + "tls": { + "verify": true + } + }, + "uri": "/kafka-tlsv" + }]], + }, + { + url = "/apisix/admin/routes/kafka-tls", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:9093": 1 + }, + "type": "none", + "scheme": "kafka", + "tls": { + "verify": false + } + }, + "uri": "/kafka-tls" + }]], + }, + { + url = "/apisix/admin/routes/kafka-sasl", + data = [[{ + "upstream": { + "nodes": { + "127.0.0.1:9094": 1 + }, + "type": "none", + "scheme": "kafka" + }, + "uri": "/kafka-sasl", + "plugins": { + "kafka-proxy": { + "enable_sasl": true, + "sasl": { + "username": "admin", + "password": "admin-secret" + } + } + } + }]], + } + } + + local t = require("lib.test_admin").test + + for _, data in ipairs(data) do + local code, body = t(data.url, ngx.HTTP_PUT, data.data) + ngx.say(code..body) + end + } + } +--- response_body eval +"201passed\n"x4 + + + +=== TEST 2: hit route (with HTTP request) +--- request +GET /kafka +--- error_code: 400 +--- error_log +failed to initialize pub-sub module, err: bad "upgrade" request header: nil + + + +=== TEST 3: hit route (normal Kafka) +--- config + location /t { + content_by_lua_block { + local protoc = require("protoc") + local pb = require("pb") + protoc.reload() + pb.option("int64_as_string") + local pubsub_protoc = protoc.new() + pubsub_protoc:addpath("apisix/include/apisix/model") + local ok, err = pcall(pubsub_protoc.loadfile, pubsub_protoc, "pubsub.proto") + if not ok then + ngx.say("failed to load protocol: " .. err) + return + end + + local client = require "resty.websocket.client" + local ws, err = client:new() + local ok, err = ws:connect("ws://127.0.0.1:1984/kafka") + if not ok then + ngx.say("failed to connect: " .. err) + return + end + + local data = { + { + sequence = 0, + cmd_kafka_list_offset = { + topic = "not-exist", + partition = 0, + timestamp = -1, + }, + }, + { + sequence = 1, + cmd_kafka_fetch = { + topic = "not-exist", + partition = 0, + offset = 0, + }, + }, + { + sequence = 2, + cmd_kafka_list_offset = { + topic = "test-consumer", + partition = 0, + timestamp = -2, + }, + }, + { + sequence = 3, + cmd_kafka_list_offset = { + topic = "test-consumer", + partition = 0, + timestamp = -1, + }, + }, + { + sequence = 4, + cmd_kafka_fetch = { + topic = "test-consumer", + partition = 0, + offset = 14, + }, + } + } + + for i = 1, #data do + local _, err = ws:send_binary(pb.encode("PubSubReq", data[i])) + local raw_data, raw_type, err = ws:recv_frame() + if not raw_data then + ngx.say("failed to receive the frame: ", err) + return + end + local data, err = pb.decode("PubSubResp", raw_data) + if not data then + ngx.say("failed to decode the frame: ", err) + return + end + + if data.error_resp then + ngx.say(data.sequence..data.error_resp.message) + end + if data.kafka_list_offset_resp then + ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset) + end + if data.kafka_fetch_resp then + ngx.say(data.sequence.."offset: "..data.kafka_fetch_resp.messages[1].offset.. + " msg: "..data.kafka_fetch_resp.messages[1].value) + end + end + + ws:send_close() + } + } +--- response_body +0failed to list offset, topic: not-exist, partition: 0, err: not found topic +1failed to fetch message, topic: not-exist, partition: 0, err: not found topic +2offset: 0 +3offset: 30 +4offset: 14 msg: testmsg15 Review Comment: Let's add a comment about where the msg is from -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
