This is an automated email from the ASF dual-hosted git repository.
tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new 8bf8cc1 feat(mqtt-proxy): support domain (#4391)
8bf8cc1 is described below
commit 8bf8cc1916a7d51f9f3f7aae8fd5aa6c12415bda
Author: 罗泽轩 <[email protected]>
AuthorDate: Wed Jun 9 19:23:40 2021 +0800
feat(mqtt-proxy): support domain (#4391)
Signed-off-by: spacewander <[email protected]>
---
apisix/plugin.lua | 20 +++--
apisix/stream/plugins/mqtt-proxy.lua | 36 ++++++++-
docs/en/latest/plugins/mqtt-proxy.md | 5 +-
docs/zh/latest/plugins/mqtt-proxy.md | 10 +--
t/stream-plugin/mqtt-proxy.t | 138 ++++++++++++++++++++++++++++++++++-
5 files changed, 187 insertions(+), 22 deletions(-)
diff --git a/apisix/plugin.lua b/apisix/plugin.lua
index d9d8e2d..7dead03 100644
--- a/apisix/plugin.lua
+++ b/apisix/plugin.lua
@@ -17,6 +17,7 @@
local require = require
local core = require("apisix.core")
local config_util = require("apisix.core.config_util")
+local ngx_exit = ngx.exit
local pkg_loaded = package.loaded
local sort_tab = table.sort
local pcall = pcall
@@ -27,6 +28,7 @@ local local_plugins = core.table.new(32, 0)
local ngx = ngx
local tostring = tostring
local error = error
+local is_http = ngx.config.subsystem == "http"
local local_plugins_hash = core.table.new(0, 32)
local stream_local_plugins = core.table.new(32, 0)
local stream_local_plugins_hash = core.table.new(0, 32)
@@ -276,8 +278,6 @@ local function trace_plugins_info_for_debug(plugins)
return
end
- local is_http = ngx.config.subsystem == "http"
-
if not plugins then
if is_http and not ngx.headers_sent then
core.response.add_header("Apisix-Plugins", "no plugin")
@@ -641,11 +641,19 @@ function _M.run_plugin(phase, plugins, api_ctx)
if phase_func then
local code, body = phase_func(plugins[i + 1], api_ctx)
if code or body then
- if code >= 400 then
- core.log.warn(plugins[i].name, " exits with http
status code ", code)
+ if is_http then
+ if code >= 400 then
+ core.log.warn(plugins[i].name, " exits with http
status code ", code)
+ end
+
+ core.response.exit(code, body)
+ else
+ if code >= 400 then
+ core.log.warn(plugins[i].name, " exits with status
code ", code)
+ end
+
+ ngx_exit(1)
end
-
- core.response.exit(code, body)
end
end
end
diff --git a/apisix/stream/plugins/mqtt-proxy.lua
b/apisix/stream/plugins/mqtt-proxy.lua
index f7cfd88..dfd05ae 100644
--- a/apisix/stream/plugins/mqtt-proxy.lua
+++ b/apisix/stream/plugins/mqtt-proxy.lua
@@ -16,6 +16,7 @@
--
local core = require("apisix.core")
local upstream = require("apisix.upstream")
+local ipmatcher = require("resty.ipmatcher")
local bit = require("bit")
local ngx = ngx
local ngx_exit = ngx.exit
@@ -31,9 +32,14 @@ local schema = {
upstream = {
type = "object",
properties = {
- ip = {type = "string"},
+ ip = {type = "string"}, -- deprecated, use "host" instead
+ host = {type = "string"},
port = {type = "number"},
- }
+ },
+ oneOf = {
+ {required = {"host", "port"}},
+ {required = {"ip", "port"}},
+ },
}
},
required = {"protocol_name", "protocol_level", "upstream"},
@@ -159,16 +165,38 @@ function _M.preread(conf, ctx)
core.log.info("mqtt client id: ", res.client_id)
+ local host = conf.upstream.host
+ if not host then
+ host = conf.upstream.ip
+ end
+
+ if conf.host_is_domain == nil then
+ conf.host_is_domain = not ipmatcher.parse_ipv4(host)
+ and not ipmatcher.parse_ipv6(host)
+ end
+
+ if conf.host_is_domain then
+ local ip, err = core.resolver.parse_domain(host)
+ if not ip then
+ core.log.error("failed to parse host ", host, ", err: ", err)
+ return 500
+ end
+
+ host = ip
+ end
+
local up_conf = {
type = "roundrobin",
nodes = {
- {host = conf.upstream.ip, port = conf.upstream.port, weight = 1},
+ {host = host, port = conf.upstream.port, weight = 1},
}
}
local ok, err = upstream.check_schema(up_conf)
if not ok then
- return 500, err
+ core.log.error("failed to check schema ",
core.json.delay_encode(up_conf),
+ ", err: ", err)
+ return 500
end
local matched_route = ctx.matched_route
diff --git a/docs/en/latest/plugins/mqtt-proxy.md
b/docs/en/latest/plugins/mqtt-proxy.md
index ab7f502..e90a071 100644
--- a/docs/en/latest/plugins/mqtt-proxy.md
+++ b/docs/en/latest/plugins/mqtt-proxy.md
@@ -41,7 +41,8 @@ And this plugin both support MQTT protocol
[3.1.*](http://docs.oasis-open.org/mq
| -------------- | ------- | ----------- | ------- | ----- |
--------------------------------------------------------------------------------------
|
| protocol_name | string | required | | | Name of protocol,
should be `MQTT` in normal. |
| protocol_level | integer | required | | | Level of
protocol, it should be `4` for MQTT `3.1.*`. it should be `5` for MQTT `5.0`. |
-| upstream.ip | string | required | | | IP address of
upstream, will forward current request to. |
+| upstream.host | string | required | | | the IP or host of
upstream, will forward current request to. |
+| upstream.ip | string | deprecated | | | Use "host"
instead. IP address of upstream, will forward current request to.|
| upstream.port | number | required | | | Port of upstream,
will forward current request to. |
## How To Enable
@@ -74,7 +75,7 @@ curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H
'X-API-KEY: edd1c9f03
"protocol_name": "MQTT",
"protocol_level": 4,
"upstream": {
- "ip": "127.0.0.1",
+ "host": "127.0.0.1",
"port": 1980
}
}
diff --git a/docs/zh/latest/plugins/mqtt-proxy.md
b/docs/zh/latest/plugins/mqtt-proxy.md
index 5eb2661..7a848c0 100644
--- a/docs/zh/latest/plugins/mqtt-proxy.md
+++ b/docs/zh/latest/plugins/mqtt-proxy.md
@@ -36,16 +36,12 @@ title: mqtt-proxy
## 属性
-* `protocol_name`: 必选,协议名称,正常情况下应为“ MQTT” 。
-* `protocol_level`: 必选,协议级别,MQTT `3.1.*` 应为 “4” ,MQTT `5.0` 应该是“5”。
-* `upstream.ip`: 必选,将当前请求转发到的上游的 IP 地址,
-* `upstream.port`: 必选,将当前请求转发到的上游的 端口,
-
| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述
|
| -------------- | ------- | ------ | ------ | ------ |
------------------------------------------------------ |
| protocol_name | string | 必须 | | | 协议名称,正常情况下应为“ MQTT”
|
| protocol_level | integer | 必须 | | | 协议级别,MQTT `3.1.*` 应为 `4`
,MQTT `5.0` 应是`5`。 |
-| upstream.ip | string | 必须 | | | 将当前请求转发到的上游的 IP 地址
|
+| upstream.host | string | 必须 | | | 将当前请求转发到的上游的 IP 地址或域名
|
+| upstream.ip | string | 废弃 | | |
推荐使用“host”代替。将当前请求转发到的上游的 IP 地址 |
| upstream.port | number | 必须 | | | 将当前请求转发到的上游的端口
|
## 如何启用
@@ -77,7 +73,7 @@ curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H
'X-API-KEY: edd1c9f03
"protocol_name": "MQTT",
"protocol_level": 4,
"upstream": {
- "ip": "127.0.0.1",
+ "host": "127.0.0.1",
"port": 1980
}
}
diff --git a/t/stream-plugin/mqtt-proxy.t b/t/stream-plugin/mqtt-proxy.t
index 82f5453..5e74823 100644
--- a/t/stream-plugin/mqtt-proxy.t
+++ b/t/stream-plugin/mqtt-proxy.t
@@ -17,7 +17,7 @@
use t::APISIX 'no_plan';
-repeat_each(1);
+repeat_each(2);
no_long_string();
no_shuffle();
no_root_location();
@@ -41,7 +41,7 @@ __DATA__
"protocol_name": "MQTT",
"protocol_level": 4,
"upstream": {
- "ip": "127.0.0.1",
+ "host": "127.0.0.1",
"port": 1995
}
}
@@ -99,7 +99,7 @@ hello world
"protocol_name": "MQTT",
"protocol_level": 4,
"upstream": {
- "ip": "127.0.0.1",
+ "host": "127.0.0.1",
"port": 1995
}
}
@@ -132,3 +132,135 @@ receive stream response error: connection reset by peer
receive stream response error: connection reset by peer
--- error_log
match(): not hit any route
+
+
+
+=== TEST 6: check schema
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "remote_addr": "127.0.0.1",
+ "server_port": 1985,
+ "plugins": {
+ "mqtt-proxy": {
+ "protocol_name": "MQTT",
+ "protocol_level": 4,
+ "upstream": {
+ "host": "127.0.0.1"
+ }
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.print(body)
+ }
+ }
+--- request
+GET /t
+--- error_code: 400
+--- response_body
+{"error_msg":"failed to check the configuration of stream plugin [mqtt-proxy]:
property \"upstream\" validation failed: value should match only one schema,
but matches none"}
+
+
+
+=== TEST 7: set route with host
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "remote_addr": "127.0.0.1",
+ "server_port": 1985,
+ "plugins": {
+ "mqtt-proxy": {
+ "protocol_name": "MQTT",
+ "protocol_level": 4,
+ "upstream": {
+ "host": "localhost",
+ "port": 1995
+ }
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 8: hit route
+--- stream_enable
+--- stream_request eval
+"\x10\x0f\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x03\x66\x6f\x6f"
+--- stream_response
+hello world
+--- no_error_log
+[error]
+
+
+
+=== TEST 9: set route with invalid host
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/stream_routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "remote_addr": "127.0.0.1",
+ "server_port": 1985,
+ "plugins": {
+ "mqtt-proxy": {
+ "protocol_name": "MQTT",
+ "protocol_level": 4,
+ "upstream": {
+ "host": "loc",
+ "port": 1995
+ }
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 10: hit route
+--- stream_enable
+--- stream_request eval
+"\x10\x0f\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x03\x66\x6f\x6f"
+--- error_log
+failed to parse domain: loc, error: