diff --git a/apisix/stream/plugins/mqtt-proxy.lua b/apisix/stream/plugins/mqtt-proxy.lua index 2c421dcc2c49..f075e204db95 100644 --- a/apisix/stream/plugins/mqtt-proxy.lua +++ b/apisix/stream/plugins/mqtt-proxy.lua @@ -15,8 +15,6 @@ -- limitations under the License. -- local core = require("apisix.core") -local upstream = require("apisix.upstream") -local ipmatcher = require("resty.ipmatcher") local bit = require("bit") local ngx = ngx local str_byte = string.byte @@ -32,20 +30,7 @@ local schema = { type = "object", properties = { protocol_name = {type = "string"}, - protocol_level = {type = "integer"}, - upstream = { - description = "Deprecated. We should configure upstream outside of the plugin", - type = "object", - properties = { - ip = {type = "string"}, -- deprecated, use "host" instead - host = {type = "string"}, - port = {type = "number"}, - }, - oneOf = { - {required = {"host", "port"}}, - {required = {"ip", "port"}}, - }, - } + protocol_level = {type = "integer"} }, required = {"protocol_name", "protocol_level"}, } @@ -189,48 +174,6 @@ function _M.preread(conf, ctx) if res.client_id ~= "" then ctx.mqtt_client_id = res.client_id end - - if not conf.upstream then - return - end - - local host = conf.upstream.host - if not host then - host = conf.upstream.ip - end - - if conf.host_is_domain == nil then - conf.host_is_domain = not ipmatcher.parse_ipv4(host) - and not ipmatcher.parse_ipv6(host) - end - - if conf.host_is_domain then - local ip, err = core.resolver.parse_domain(host) - if not ip then - core.log.error("failed to parse host ", host, ", err: ", err) - return 503 - end - - host = ip - end - - local up_conf = { - type = "roundrobin", - nodes = { - {host = host, port = conf.upstream.port, weight = 1}, - } - } - - local ok, err = upstream.check_schema(up_conf) - if not ok then - core.log.error("failed to check schema ", core.json.delay_encode(up_conf), - ", err: ", err) - return 503 - end - - local matched_route = ctx.matched_route - upstream.set(ctx, up_conf.type .. "#route_" .. matched_route.value.id, - ctx.conf_version, up_conf) return end diff --git a/docs/en/latest/plugins/mqtt-proxy.md b/docs/en/latest/plugins/mqtt-proxy.md index 6b5dd18b43fc..92c982290573 100644 --- a/docs/en/latest/plugins/mqtt-proxy.md +++ b/docs/en/latest/plugins/mqtt-proxy.md @@ -39,10 +39,6 @@ This Plugin supports both the protocols [3.1.*](http://docs.oasis-open.org/mqtt/ |----------------|---------|------------|-----------------------------------------------------------------------------------| | protocol_name | string | True | Name of the protocol. Generally `MQTT`. | | protocol_level | integer | True | Level of the protocol. It should be `4` for MQTT `3.1.*` and `5` for MQTT `5.0`. | -| upstream | object | Deprecated | Use separate Upstream in the Route instead. | -| upstream.host | string | True | IP or host of the upstream to forward the current request to. | -| upstream.ip | string | Deprecated | Use `host` instead. IP address of the upstream to forward the current request to. | -| upstream.port | number | True | Port of the upstream to forward the current request to. | ## Enabling the Plugin diff --git a/docs/en/latest/stand-alone.md b/docs/en/latest/stand-alone.md index 736bd39601a3..63a769533c7a 100644 --- a/docs/en/latest/stand-alone.md +++ b/docs/en/latest/stand-alone.md @@ -284,9 +284,6 @@ stream_routes: mqtt-proxy: protocol_name: "MQTT" protocol_level: 4 - upstream: - ip: "127.0.0.1" - port: 1995 upstreams: - nodes: "127.0.0.1:1995": 1 diff --git a/docs/zh/latest/plugins/mqtt-proxy.md b/docs/zh/latest/plugins/mqtt-proxy.md index d9902e7109af..45b86e51416b 100644 --- a/docs/zh/latest/plugins/mqtt-proxy.md +++ b/docs/zh/latest/plugins/mqtt-proxy.md @@ -39,10 +39,6 @@ description: 本文档介绍了 Apache APISIX mqtt-proxy 插件的信息,通 | -------------- | ------- | ----- | ------------------------------------------------------ | | protocol_name | string | 是 | 协议名称,正常情况下应为 `MQTT`。 | | protocol_level | integer | 是 | 协议级别,MQTT `3.1.*` 为 `4`,MQTT `5.0` 应是`5`。 | -| upstream | object | 废弃 | 推荐使用 Route 上配置的上游信息。 | -| upstream.host | string | 是 | 将当前请求转发到的上游的 IP 地址或域名。 | -| upstream.ip | string | 废弃 | 推荐使用 `host` 代替。将当前请求转发到的上游的 IP 地址。 | -| upstream.port | number | 是 | 将当前请求转发到的上游的端口。 | ## 启用插件 diff --git a/docs/zh/latest/stand-alone.md b/docs/zh/latest/stand-alone.md index 07f7425afad7..7effb17b6da6 100644 --- a/docs/zh/latest/stand-alone.md +++ b/docs/zh/latest/stand-alone.md @@ -282,9 +282,6 @@ stream_routes: mqtt-proxy: protocol_name: "MQTT" protocol_level: 4 - upstream: - ip: "127.0.0.1" - port: 1995 upstreams: - nodes: "127.0.0.1:1995": 1 diff --git a/t/admin/stream-routes.t b/t/admin/stream-routes.t index 7cc489dc9185..244e7607dcb2 100644 --- a/t/admin/stream-routes.t +++ b/t/admin/stream-routes.t @@ -221,12 +221,19 @@ GET /t "plugins": { "mqtt-proxy": { "protocol_name": "MQTT", - "protocol_level": 4, - "upstream": { - "ip": "127.0.0.1", - "port": 1980 - } + "protocol_level": 4 } + }, + "upstream": { + "type": "chash", + "key": "mqtt_client_id", + "nodes": [ + { + "host": "127.0.0.1", + "port": 1980, + "weight": 1 + } + ] } }]] ) @@ -260,12 +267,19 @@ passed "plugins": { "mqtt-proxy": { "protocol_name": "MQTT", - "protocol_level": 4, - "upstream": { - "ip": "127.0.0.1", - "port": 1980 - } + "protocol_level": 4 } + }, + "upstream": { + "type": "chash", + "key": "mqtt_client_id", + "nodes": [ + { + "host": "127.0.0.1", + "port": 1980, + "weight": 1 + } + ] } }]] ) diff --git a/t/config-center-yaml/stream-route.t b/t/config-center-yaml/stream-route.t index b6bfabff9aa3..df28a3c0f276 100644 --- a/t/config-center-yaml/stream-route.t +++ b/t/config-center-yaml/stream-route.t @@ -113,9 +113,6 @@ stream_routes: mqtt-proxy: protocol_name: "MQTT" protocol_level: 4 - upstream: - ip: "127.0.0.1" - port: 1995 upstreams: - nodes: "127.0.0.1:1995": 1 diff --git a/t/debug/debug-mode.t b/t/debug/debug-mode.t index 0fe20a8bb922..bbc1d7457437 100644 --- a/t/debug/debug-mode.t +++ b/t/debug/debug-mode.t @@ -321,12 +321,19 @@ passed "plugins": { "mqtt-proxy": { "protocol_name": "MQTT", - "protocol_level": 4, - "upstream": { - "ip": "127.0.0.1", - "port": 1995 - } + "protocol_level": 4 } + }, + "upstream": { + "type": "chash", + "key": "mqtt_client_id", + "nodes": [ + { + "host": "127.0.0.1", + "port": 1995, + "weight": 1 + } + ] } }]] ) diff --git a/t/stream-plugin/mqtt-proxy.t b/t/stream-plugin/mqtt-proxy.t index 4a59e376d362..69403f380f85 100644 --- a/t/stream-plugin/mqtt-proxy.t +++ b/t/stream-plugin/mqtt-proxy.t @@ -39,12 +39,19 @@ __DATA__ "plugins": { "mqtt-proxy": { "protocol_name": "MQTT", - "protocol_level": 4, - "upstream": { + "protocol_level": 4 + } + }, + "upstream": { + "type": "chash", + "key": "mqtt_client_id", + "nodes": [ + { "host": "127.0.0.1", - "port": 1995 + "port": 1995, + "weight": 1 } - } + ] } }]] ) @@ -132,7 +139,7 @@ match(): not hit any route -=== TEST 6: check schema +=== TEST 6: set route with host --- config location /t { content_by_lua_block { @@ -145,51 +152,22 @@ match(): not hit any route "plugins": { "mqtt-proxy": { "protocol_name": "MQTT", - "protocol_level": 4, - "upstream": { - "host": "127.0.0.1" - } + "protocol_level": 4 } - } - }]] - ) - - if code >= 300 then - ngx.status = code - end - ngx.print(body) - } - } ---- request -GET /t ---- error_code: 400 ---- response_body -{"error_msg":"failed to check the configuration of stream plugin [mqtt-proxy]: property \"upstream\" validation failed: value should match only one schema, but matches none"} - - - -=== TEST 7: set route with host ---- config - location /t { - content_by_lua_block { - local t = require("lib.test_admin").test - local code, body = t('/apisix/admin/stream_routes/1', - ngx.HTTP_PUT, - [[{ - "remote_addr": "127.0.0.1", - "server_port": 1985, - "plugins": { - "mqtt-proxy": { - "protocol_name": "MQTT", - "protocol_level": 4, - "upstream": { + }, + "upstream": { + "type": "chash", + "key": "mqtt_client_id", + "nodes": [ + { "host": "localhost", - "port": 1995 + "port": 1995, + "weight": 1 } - } + ] } }]] - ) + ) if code >= 300 then ngx.status = code @@ -206,7 +184,7 @@ passed -=== TEST 8: hit route +=== TEST 7: hit route --- stream_request eval "\x10\x0f\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x03\x66\x6f\x6f" --- stream_response @@ -216,54 +194,7 @@ hello world -=== TEST 9: set route with invalid host ---- config - location /t { - content_by_lua_block { - local t = require("lib.test_admin").test - local code, body = t('/apisix/admin/stream_routes/1', - ngx.HTTP_PUT, - [[{ - "remote_addr": "127.0.0.1", - "server_port": 1985, - "plugins": { - "mqtt-proxy": { - "protocol_name": "MQTT", - "protocol_level": 4, - "upstream": { - "host": "loc", - "port": 1995 - } - } - } - }]] - ) - - if code >= 300 then - ngx.status = code - end - ngx.say(body) - } - } ---- request -GET /t ---- response_body -passed ---- no_error_log -[error] - - - -=== TEST 10: hit route ---- stream_request eval -"\x10\x0f\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x03\x66\x6f\x6f" ---- error_log -failed to parse domain: loc, error: ---- timeout: 10 - - - -=== TEST 11: set route with upstream +=== TEST 8: set route with upstream --- config location /t { content_by_lua_block { @@ -305,7 +236,7 @@ passed -=== TEST 12: hit route +=== TEST 9: hit route --- stream_request eval "\x10\x0f\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x03\x66\x6f\x6f" --- stream_response @@ -319,7 +250,7 @@ mqtt client id: foo -=== TEST 13: hit route with empty client id +=== TEST 10: hit route with empty client id --- stream_request eval "\x10\x0c\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x00" --- stream_response @@ -332,7 +263,7 @@ qr/mqtt client id: \w+/ -=== TEST 14: MQTT 5 +=== TEST 11: MQTT 5 --- config location /t { content_by_lua_block { @@ -374,7 +305,7 @@ passed -=== TEST 15: hit route with empty property +=== TEST 12: hit route with empty property --- stream_request eval "\x10\x0d\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x00" --- stream_response @@ -387,7 +318,7 @@ qr/mqtt client id: \w+/ -=== TEST 16: hit route with property +=== TEST 13: hit route with property --- stream_request eval "\x10\x1b\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x05\x11\x00\x00\x0e\x10\x00\x09\x63\x6c\x69\x6e\x74\x2d\x31\x31\x31" --- stream_response @@ -401,7 +332,7 @@ mqtt client id: clint-111 -=== TEST 17: balance with mqtt_client_id +=== TEST 14: balance with mqtt_client_id --- config location /t { content_by_lua_block { @@ -451,7 +382,7 @@ passed -=== TEST 18: hit route with empty id +=== TEST 15: hit route with empty id --- stream_request eval "\x10\x0d\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x00" --- stream_response @@ -465,7 +396,7 @@ proxy request to 127.0.0.1:1995 -=== TEST 19: hit route with different client id, part 1 +=== TEST 16: hit route with different client id, part 1 --- stream_request eval "\x10\x0e\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x01\x66" --- stream_response @@ -480,7 +411,7 @@ proxy request to 0.0.0.0:1995 -=== TEST 20: hit route with different client id, part 2 +=== TEST 17: hit route with different client id, part 2 --- stream_request eval "\x10\x0e\x00\x04\x4d\x51\x54\x54\x05\x02\x00\x3c\x00\x00\x01\x67" --- stream_response diff --git a/t/stream-plugin/mqtt-proxy2.t b/t/stream-plugin/mqtt-proxy2.t new file mode 100644 index 000000000000..e387b26dce1a --- /dev/null +++ b/t/stream-plugin/mqtt-proxy2.t @@ -0,0 +1,79 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_shuffle(); +no_root_location(); + +run_tests; + +__DATA__ + +=== TEST 1: set route with invalid host +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + [[{ + "remote_addr": "127.0.0.1", + "server_port": 1985, + "plugins": { + "mqtt-proxy": { + "protocol_name": "MQTT", + "protocol_level": 4 + } + }, + "upstream": { + "type": "chash", + "key": "mqtt_client_id", + "nodes": [ + { + "host": "loc", + "port": 1995, + "weight": 1 + } + ] + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 2: hit route +--- stream_request eval +"\x10\x0f\x00\x04\x4d\x51\x54\x54\x04\x02\x00\x3c\x00\x03\x66\x6f\x6f" +--- error_log +failed to parse domain: loc, error: +--- timeout: 10