This is an automated email from the ASF dual-hosted git repository.

tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bf8cc1  feat(mqtt-proxy): support domain (#4391)
8bf8cc1 is described below

commit 8bf8cc1916a7d51f9f3f7aae8fd5aa6c12415bda
Author: 罗泽轩 <[email protected]>
AuthorDate: Wed Jun 9 19:23:40 2021 +0800

    feat(mqtt-proxy): support domain (#4391)
    
    Signed-off-by: spacewander <[email protected]>
---
 apisix/plugin.lua                    |  20 +++--
 apisix/stream/plugins/mqtt-proxy.lua |  36 ++++++++-
 docs/en/latest/plugins/mqtt-proxy.md |   5 +-
 docs/zh/latest/plugins/mqtt-proxy.md |  10 +--
 t/stream-plugin/mqtt-proxy.t         | 138 ++++++++++++++++++++++++++++++++++-
 5 files changed, 187 insertions(+), 22 deletions(-)

diff --git a/apisix/plugin.lua b/apisix/plugin.lua
index d9d8e2d..7dead03 100644
--- a/apisix/plugin.lua
+++ b/apisix/plugin.lua
@@ -17,6 +17,7 @@
 local require       = require
 local core          = require("apisix.core")
 local config_util   = require("apisix.core.config_util")
+local ngx_exit      = ngx.exit
 local pkg_loaded    = package.loaded
 local sort_tab      = table.sort
 local pcall         = pcall
@@ -27,6 +28,7 @@ local local_plugins = core.table.new(32, 0)
 local ngx           = ngx
 local tostring      = tostring
 local error         = error
+local is_http       = ngx.config.subsystem == "http"
 local local_plugins_hash    = core.table.new(0, 32)
 local stream_local_plugins  = core.table.new(32, 0)
 local stream_local_plugins_hash = core.table.new(0, 32)
@@ -276,8 +278,6 @@ local function trace_plugins_info_for_debug(plugins)
         return
     end
 
-    local is_http = ngx.config.subsystem == "http"
-
     if not plugins then
         if is_http and not ngx.headers_sent then
             core.response.add_header("Apisix-Plugins", "no plugin")
@@ -641,11 +641,19 @@ function _M.run_plugin(phase, plugins, api_ctx)
             if phase_func then
                 local code, body = phase_func(plugins[i + 1], api_ctx)
                 if code or body then
-                    if code >= 400 then
-                        core.log.warn(plugins[i].name, " exits with http 
status code ", code)
+                    if is_http then
+                        if code >= 400 then
+                            core.log.warn(plugins[i].name, " exits with http 
status code ", code)
+                        end
+
+                        core.response.exit(code, body)
+                    else
+                        if code >= 400 then
+                            core.log.warn(plugins[i].name, " exits with status 
code ", code)
+                        end
+
+                        ngx_exit(1)
                     end
-
-                    core.response.exit(code, body)
                 end
             end
         end
diff --git a/apisix/stream/plugins/mqtt-proxy.lua 
b/apisix/stream/plugins/mqtt-proxy.lua
index f7cfd88..dfd05ae 100644
--- a/apisix/stream/plugins/mqtt-proxy.lua
+++ b/apisix/stream/plugins/mqtt-proxy.lua
@@ -16,6 +16,7 @@
 --
 local core      = require("apisix.core")
 local upstream  = require("apisix.upstream")
+local ipmatcher = require("resty.ipmatcher")
 local bit       = require("bit")
 local ngx       = ngx
 local ngx_exit  = ngx.exit
@@ -31,9 +32,14 @@ local schema = {
         upstream = {
             type = "object",
             properties = {
-                ip = {type = "string"},
+                ip = {type = "string"}, -- deprecated, use "host" instead
+                host = {type = "string"},
                 port = {type = "number"},
-            }
+            },
+            oneOf = {
+                {required = {"host", "port"}},
+                {required = {"ip", "port"}},
+            },
         }
     },
     required = {"protocol_name", "protocol_level", "upstream"},
@@ -159,16 +165,38 @@ function _M.preread(conf, ctx)
 
     core.log.info("mqtt client id: ", res.client_id)
 
+    local host = conf.upstream.host
+    if not host then
+        host = conf.upstream.ip
+    end
+
+    if conf.host_is_domain == nil then
+        conf.host_is_domain = not ipmatcher.parse_ipv4(host)
+                              and not ipmatcher.parse_ipv6(host)
+    end
+
+    if conf.host_is_domain then
+        local ip, err = core.resolver.parse_domain(host)
+        if not ip then
+            core.log.error("failed to parse host ", host, ", err: ", err)
+            return 500
+        end
+
+        host = ip
+    end
+
     local up_conf = {
         type = "roundrobin",
         nodes = {
-            {host = conf.upstream.ip, port = conf.upstream.port, weight = 1},
+            {host = host, port = conf.upstream.port, weight = 1},
         }
     }
 
     local ok, err = upstream.check_schema(up_conf)
     if not ok then
-        return 500, err
+        core.log.error("failed to check schema ", 
core.json.delay_encode(up_conf),
+                       ", err: ", err)
+        return 500
     end
 
     local matched_route = ctx.matched_route
diff --git a/docs/en/latest/plugins/mqtt-proxy.md 
b/docs/en/latest/plugins/mqtt-proxy.md
index ab7f502..e90a071 100644
--- a/docs/en/latest/plugins/mqtt-proxy.md
+++ b/docs/en/latest/plugins/mqtt-proxy.md
@@ -41,7 +41,8 @@ And this plugin both support MQTT protocol 
[3.1.*](http://docs.oasis-open.org/mq
 | -------------- | ------- | ----------- | ------- | ----- | 
--------------------------------------------------------------------------------------
 |
 | protocol_name  | string  | required    |         |       | Name of protocol, 
should be `MQTT` in normal.                                          |
 | protocol_level | integer | required    |         |       | Level of 
protocol, it should be `4` for MQTT `3.1.*`. it should be `5` for MQTT `5.0`. |
-| upstream.ip    | string  | required    |         |       | IP address of 
upstream, will forward current request to.                               |
+| upstream.host  | string  | required    |         |       | the IP or host of 
upstream, will forward current request to.                           |
+| upstream.ip    | string  | deprecated  |         |       | Use "host" 
instead. IP address of upstream, will forward current request to.|
 | upstream.port  | number  | required    |         |       | Port of upstream, 
will forward current request to.                                     |
 
 ## How To Enable
@@ -74,7 +75,7 @@ curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 
'X-API-KEY: edd1c9f03
             "protocol_name": "MQTT",
             "protocol_level": 4,
             "upstream": {
-                "ip": "127.0.0.1",
+                "host": "127.0.0.1",
                 "port": 1980
             }
         }
diff --git a/docs/zh/latest/plugins/mqtt-proxy.md 
b/docs/zh/latest/plugins/mqtt-proxy.md
index 5eb2661..7a848c0 100644
--- a/docs/zh/latest/plugins/mqtt-proxy.md
+++ b/docs/zh/latest/plugins/mqtt-proxy.md
@@ -36,16 +36,12 @@ title: mqtt-proxy
 
 ## 属性
 
-* `protocol_name`: 必选,协议名称,正常情况下应为“ MQTT” 。
-* `protocol_level`: 必选,协议级别,MQTT `3.1.*` 应为 “4” ,MQTT `5.0` 应该是“5”。
-* `upstream.ip`: 必选,将当前请求转发到的上游的 IP 地址,
-* `upstream.port`: 必选,将当前请求转发到的上游的 端口,
-
 | 名称           | 类型    | 必选项 | 默认值 | 有效值 | 描述                                  
                 |
 | -------------- | ------- | ------ | ------ | ------ | 
------------------------------------------------------ |
 | protocol_name  | string  | 必须   |        |        | 协议名称,正常情况下应为“ MQTT”      
                  |
 | protocol_level | integer | 必须   |        |        | 协议级别,MQTT `3.1.*` 应为 `4` 
,MQTT `5.0` 应是`5`。 |
-| upstream.ip    | string  | 必须   |        |        | 将当前请求转发到的上游的 IP 地址       
                |
+| upstream.host  | string  | 必须   |        |        | 将当前请求转发到的上游的 IP 地址或域名    
              |
+| upstream.ip    | string  | 废弃   |        |        | 
推荐使用“host”代替。将当前请求转发到的上游的 IP 地址                       |
 | upstream.port  | number  | 必须   |        |        | 将当前请求转发到的上游的端口           
                |
 
 ## 如何启用
@@ -77,7 +73,7 @@ curl http://127.0.0.1:9080/apisix/admin/stream_routes/1 -H 
'X-API-KEY: edd1c9f03
             "protocol_name": "MQTT",
             "protocol_level": 4,
             "upstream": {
-                "ip": "127.0.0.1",
+                "host": "127.0.0.1",
                 "port": 1980
             }
         }
diff --git a/t/stream-plugin/mqtt-proxy.t b/t/stream-plugin/mqtt-proxy.t
index 82f5453..5e74823 100644
--- a/t/stream-plugin/mqtt-proxy.t
+++ b/t/stream-plugin/mqtt-proxy.t
@@ -17,7 +17,7 @@
 
 use t::APISIX 'no_plan';
 
-repeat_each(1);
+repeat_each(2);
 no_long_string();
 no_shuffle();
 no_root_location();
@@ -41,7 +41,7 @@ __DATA__
                             "protocol_name": "MQTT",
                             "protocol_level": 4,
                             "upstream": {
-                                "ip": "127.0.0.1",
+                                "host": "127.0.0.1",
                                 "port": 1995
                             }
                         }
@@ -99,7 +99,7 @@ hello world
                             "protocol_name": "MQTT",
                             "protocol_level": 4,
                             "upstream": {
-                                "ip": "127.0.0.1",
+                                "host": "127.0.0.1",
                                 "port": 1995
                             }
                         }
@@ -132,3 +132,135 @@ receive stream response error: connection reset by peer
 receive stream response error: connection reset by peer
 --- error_log
 match(): not hit any route
+
+
+
+=== TEST 6: check schema
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "remote_addr": "127.0.0.1",
+                    "server_port": 1985,
+                    "plugins": {
+                        "mqtt-proxy": {
+                            "protocol_name": "MQTT",
+                            "protocol_level": 4,
+                            "upstream": {
+                                "host": "127.0.0.1"
+                            }
+                        }
+                    }
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.print(body)
+        }
+    }
+--- request
+GET /t
+--- error_code: 400
+--- response_body
+{"error_msg":"failed to check the configuration of stream plugin [mqtt-proxy]: 
property \"upstream\" validation failed: value should match only one schema, 
but matches none"}
+
+
+
+=== TEST 7: set route with host
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "remote_addr": "127.0.0.1",
+                    "server_port": 1985,
+                    "plugins": {
+                        "mqtt-proxy": {
+                            "protocol_name": "MQTT",
+                            "protocol_level": 4,
+                            "upstream": {
+                                "host": "localhost",
+                                "port": 1995
+                            }
+                        }
+                    }
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 8: hit route
+--- stream_enable
+--- stream_request eval
+"\x10\x0f\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x03\x66\x6f\x6f"
+--- stream_response
+hello world
+--- no_error_log
+[error]
+
+
+
+=== TEST 9: set route with invalid host
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/stream_routes/1',
+                ngx.HTTP_PUT,
+                [[{
+                    "remote_addr": "127.0.0.1",
+                    "server_port": 1985,
+                    "plugins": {
+                        "mqtt-proxy": {
+                            "protocol_name": "MQTT",
+                            "protocol_level": 4,
+                            "upstream": {
+                                "host": "loc",
+                                "port": 1995
+                            }
+                        }
+                    }
+                }]]
+                )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- request
+GET /t
+--- response_body
+passed
+--- no_error_log
+[error]
+
+
+
+=== TEST 10: hit route
+--- stream_enable
+--- stream_request eval
+"\x10\x0f\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x03\x66\x6f\x6f"
+--- error_log
+failed to parse domain: loc, error:

Reply via email to