This is an automated email from the ASF dual-hosted git repository.
zhangjintao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 530ce52b feat: add mqtt-proxy plugin in ApisixRoute (#1056)
530ce52b is described below
commit 530ce52b278d7e385db3747d33e0d9fe1db0f3d1
Author: lsy <[email protected]>
AuthorDate: Sun Aug 21 22:20:15 2022 +0800
feat: add mqtt-proxy plugin in ApisixRoute (#1056)
---
.github/workflows/spell-checker.yml | 2 +-
docs/en/latest/references/apisix_route_v2.md | 4 +
pkg/api/validation/apisix_route_test.go | 2 +-
pkg/kube/apisix/apis/config/v2/types.go | 23 ++--
.../apisix/apis/config/v2/zz_generated.deepcopy.go | 49 +++++----
pkg/providers/apisix/translation/apisix_route.go | 15 +++
pkg/types/apisix/v1/types.go | 1 +
pkg/types/apisix/v1/zz_generated.deepcopy.go | 1 +
samples/deploy/crd/v1/ApisixRoute.yaml | 16 +++
test/e2e/go.mod | 1 +
test/e2e/go.sum | 6 +
test/e2e/scaffold/scaffold.go | 8 ++
.../suite-plugins-other/mqtt-proxy.go | 122 +++++++++++++++++++++
13 files changed, 217 insertions(+), 33 deletions(-)
diff --git a/.github/workflows/spell-checker.yml
b/.github/workflows/spell-checker.yml
index ef1d159a..074f3bda 100644
--- a/.github/workflows/spell-checker.yml
+++ b/.github/workflows/spell-checker.yml
@@ -35,5 +35,5 @@ jobs:
wget -O - -q https://git.io/misspell | sh -s -- -b .
- name: Misspell
run: |
- find . -name "*.go" -type f | xargs ./misspell -error
+ find . -name "*.go" -type f | xargs ./misspell -i mosquitto -error
find docs -type f | xargs ./misspell -error
diff --git a/docs/en/latest/references/apisix_route_v2.md
b/docs/en/latest/references/apisix_route_v2.md
index 341b9cbe..6deb9c23 100644
--- a/docs/en/latest/references/apisix_route_v2.md
+++ b/docs/en/latest/references/apisix_route_v2.md
@@ -73,6 +73,10 @@ Meaning of each field in the spec of ApisixRoute are
followed, the top level fie
| stream[].backend.servicePort | integer or string | The backend
service port, can be the port number or the name defined in the service object.
|
| stream[].backend.resolveGranularity | string | See [Service
Resolve Granularity](#service-resolve-granularity) for the details.
|
| stream[].backend.subset | string | Subset specifies
a subset for the target Service. The subset should be pre-definedin
ApisixUpstream about this service.
|
+| stream[].plugins | array | A series of
APISIX plugins that will be executed once this route rule is matched
|
+| stream[].plugins[].name | string | The plugin name,
see [docs](http://apisix.apache.org/docs/apisix/getting-started) for learning
the available plugins.
|
+| stream[].plugins[].enable | boolean | Whether the
plugin would be used
|
+| stream[].plugins[].config | object | The
configuration of the plugin that must have the same fields as in APISIX.
|
## Expression Operators
diff --git a/pkg/api/validation/apisix_route_test.go
b/pkg/api/validation/apisix_route_test.go
index 7249aa93..f51ba0a7 100644
--- a/pkg/api/validation/apisix_route_test.go
+++ b/pkg/api/validation/apisix_route_test.go
@@ -124,7 +124,7 @@ func Test_validatePlugin(t *testing.T) {
fakeClient := newFakeSchemaClient()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- gotValid, _ := validatePlugin(fakeClient,
tt.pluginName, v2.ApisixRouteHTTPPluginConfig(tt.pluginConfig))
+ gotValid, _ := validatePlugin(fakeClient,
tt.pluginName, v2.ApisixRoutePluginConfig(tt.pluginConfig))
if gotValid != tt.wantValid {
t.Errorf("validatePlugin() gotValid = %v, want
%v", gotValid, tt.wantValid)
}
diff --git a/pkg/kube/apisix/apis/config/v2/types.go
b/pkg/kube/apisix/apis/config/v2/types.go
index 6adc5c09..666d7b6a 100644
--- a/pkg/kube/apisix/apis/config/v2/types.go
+++ b/pkg/kube/apisix/apis/config/v2/types.go
@@ -70,7 +70,7 @@ type ApisixRouteHTTP struct {
Backends []ApisixRouteHTTPBackend `json:"backends,omitempty"
yaml:"backends,omitempty"`
Websocket bool `json:"websocket"
yaml:"websocket"`
PluginConfigName string
`json:"plugin_config_name,omitempty" yaml:"plugin_config_name,omitempty"`
- Plugins []ApisixRouteHTTPPlugin `json:"plugins,omitempty"
yaml:"plugins,omitempty"`
+ Plugins []ApisixRoutePlugin `json:"plugins,omitempty"
yaml:"plugins,omitempty"`
Authentication ApisixRouteAuthentication
`json:"authentication,omitempty" yaml:"authentication,omitempty"`
}
@@ -152,19 +152,19 @@ type ApisixRouteHTTPMatchExprSubject struct {
Name string `json:"name" yaml:"name"`
}
-// ApisixRouteHTTPPlugin represents an APISIX plugin.
-type ApisixRouteHTTPPlugin struct {
+// ApisixRoutePlugin represents an APISIX plugin.
+type ApisixRoutePlugin struct {
// The plugin name.
Name string `json:"name" yaml:"name"`
// Whether this plugin is in use, default is true.
Enable bool `json:"enable" yaml:"enable"`
// Plugin configuration.
- Config ApisixRouteHTTPPluginConfig `json:"config" yaml:"config"`
+ Config ApisixRoutePluginConfig `json:"config" yaml:"config"`
}
-// ApisixRouteHTTPPluginConfig is the configuration for
+// ApisixRoutePluginConfig is the configuration for
// any plugins.
-type ApisixRouteHTTPPluginConfig map[string]interface{}
+type ApisixRoutePluginConfig map[string]interface{}
// ApisixRouteAuthentication is the authentication-related
// configuration in ApisixRoute.
@@ -189,16 +189,16 @@ type ApisixRouteAuthenticationJwtAuth struct {
Cookie string `json:"cookie,omitempty" yaml:"cookie,omitempty"`
}
-func (p ApisixRouteHTTPPluginConfig) DeepCopyInto(out
*ApisixRouteHTTPPluginConfig) {
+func (p ApisixRoutePluginConfig) DeepCopyInto(out *ApisixRoutePluginConfig) {
b, _ := json.Marshal(&p)
_ = json.Unmarshal(b, out)
}
-func (p *ApisixRouteHTTPPluginConfig) DeepCopy() *ApisixRouteHTTPPluginConfig {
+func (p *ApisixRoutePluginConfig) DeepCopy() *ApisixRoutePluginConfig {
if p == nil {
return nil
}
- out := new(ApisixRouteHTTPPluginConfig)
+ out := new(ApisixRoutePluginConfig)
p.DeepCopyInto(out)
return out
}
@@ -210,6 +210,7 @@ type ApisixRouteStream struct {
Protocol string `json:"protocol" yaml:"protocol"`
Match ApisixRouteStreamMatch `json:"match" yaml:"match"`
Backend ApisixRouteStreamBackend `json:"backend" yaml:"backend"`
+ Plugins []ApisixRoutePlugin `json:"plugins,omitempty"
yaml:"plugins,omitempty"`
}
// ApisixRouteStreamMatch represents the match conditions of stream route.
@@ -691,9 +692,9 @@ type ApisixPluginConfig struct {
// ApisixPluginConfigSpec defines the desired state of ApisixPluginConfigSpec.
type ApisixPluginConfigSpec struct {
- // Plugins contains a list of ApisixRouteHTTPPlugin
+ // Plugins contains a list of ApisixRoutePlugin
// +required
- Plugins []ApisixRouteHTTPPlugin `json:"plugins" yaml:"plugins"`
+ Plugins []ApisixRoutePlugin `json:"plugins" yaml:"plugins"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
diff --git a/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
b/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
index 1b94ae88..63c3ea1c 100644
--- a/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
+++ b/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
@@ -669,7 +669,7 @@ func (in *ApisixPluginConfigSpec) DeepCopyInto(out
*ApisixPluginConfigSpec) {
*out = *in
if in.Plugins != nil {
in, out := &in.Plugins, &out.Plugins
- *out = make([]ApisixRouteHTTPPlugin, len(*in))
+ *out = make([]ApisixRoutePlugin, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
@@ -783,7 +783,7 @@ func (in *ApisixRouteHTTP) DeepCopyInto(out
*ApisixRouteHTTP) {
}
if in.Plugins != nil {
in, out := &in.Plugins, &out.Plugins
- *out = make([]ApisixRouteHTTPPlugin, len(*in))
+ *out = make([]ApisixRoutePlugin, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
@@ -910,23 +910,6 @@ func (in *ApisixRouteHTTPMatchExprSubject) DeepCopy()
*ApisixRouteHTTPMatchExprS
return out
}
-// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
-func (in *ApisixRouteHTTPPlugin) DeepCopyInto(out *ApisixRouteHTTPPlugin) {
- *out = *in
- in.Config.DeepCopyInto(&out.Config)
- return
-}
-
-// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new ApisixRouteHTTPPlugin.
-func (in *ApisixRouteHTTPPlugin) DeepCopy() *ApisixRouteHTTPPlugin {
- if in == nil {
- return nil
- }
- out := new(ApisixRouteHTTPPlugin)
- in.DeepCopyInto(out)
- return out
-}
-
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
func (in *ApisixRouteList) DeepCopyInto(out *ApisixRouteList) {
*out = *in
@@ -960,6 +943,23 @@ func (in *ApisixRouteList) DeepCopyObject() runtime.Object
{
return nil
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
+func (in *ApisixRoutePlugin) DeepCopyInto(out *ApisixRoutePlugin) {
+ *out = *in
+ in.Config.DeepCopyInto(&out.Config)
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new ApisixRoutePlugin.
+func (in *ApisixRoutePlugin) DeepCopy() *ApisixRoutePlugin {
+ if in == nil {
+ return nil
+ }
+ out := new(ApisixRoutePlugin)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
func (in *ApisixRouteSpec) DeepCopyInto(out *ApisixRouteSpec) {
*out = *in
@@ -973,7 +973,9 @@ func (in *ApisixRouteSpec) DeepCopyInto(out
*ApisixRouteSpec) {
if in.Stream != nil {
in, out := &in.Stream, &out.Stream
*out = make([]ApisixRouteStream, len(*in))
- copy(*out, *in)
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
}
return
}
@@ -993,6 +995,13 @@ func (in *ApisixRouteStream) DeepCopyInto(out
*ApisixRouteStream) {
*out = *in
out.Match = in.Match
out.Backend = in.Backend
+ if in.Plugins != nil {
+ in, out := &in.Plugins, &out.Plugins
+ *out = make([]ApisixRoutePlugin, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
return
}
diff --git a/pkg/providers/apisix/translation/apisix_route.go
b/pkg/providers/apisix/translation/apisix_route.go
index bf32f733..f91ea128 100644
--- a/pkg/providers/apisix/translation/apisix_route.go
+++ b/pkg/providers/apisix/translation/apisix_route.go
@@ -818,6 +818,20 @@ func (t *translator) translateStreamRouteV2(ctx
*translation.TranslateContext, a
)
return err
}
+
+ // add stream route plugins
+ pluginMap := make(apisixv1.Plugins)
+ for _, plugin := range part.Plugins {
+ if !plugin.Enable {
+ continue
+ }
+ if plugin.Config != nil {
+ pluginMap[plugin.Name] = plugin.Config
+ } else {
+ pluginMap[plugin.Name] =
make(map[string]interface{})
+ }
+ }
+
sr := apisixv1.NewDefaultStreamRoute()
name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name,
part.Name)
sr.ID = id.GenID(name)
@@ -827,6 +841,7 @@ func (t *translator) translateStreamRouteV2(ctx
*translation.TranslateContext, a
return err
}
sr.UpstreamId = ups.ID
+ sr.Plugins = pluginMap
ctx.AddStreamRoute(sr)
if !ctx.CheckUpstreamExist(ups.Name) {
ctx.AddUpstream(ups)
diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go
index 8ffcd0d3..ea326adf 100644
--- a/pkg/types/apisix/v1/types.go
+++ b/pkg/types/apisix/v1/types.go
@@ -372,6 +372,7 @@ type StreamRoute struct {
SNI string `json:"sni,omitempty" yaml:"sni,omitempty"`
UpstreamId string `json:"upstream_id,omitempty"
yaml:"upstream_id,omitempty"`
Upstream *Upstream `json:"upstream,omitempty"
yaml:"upstream,omitempty"`
+ Plugins Plugins `json:"plugins,omitempty"
yaml:"plugins,omitempty"`
}
// GlobalRule represents the global_rule object in APISIX.
diff --git a/pkg/types/apisix/v1/zz_generated.deepcopy.go
b/pkg/types/apisix/v1/zz_generated.deepcopy.go
index 8da11e6f..a353718b 100644
--- a/pkg/types/apisix/v1/zz_generated.deepcopy.go
+++ b/pkg/types/apisix/v1/zz_generated.deepcopy.go
@@ -465,6 +465,7 @@ func (in *StreamRoute) DeepCopyInto(out *StreamRoute) {
*out = new(Upstream)
(*in).DeepCopyInto(*out)
}
+ in.Plugins.DeepCopyInto(&out.Plugins)
return
}
diff --git a/samples/deploy/crd/v1/ApisixRoute.yaml
b/samples/deploy/crd/v1/ApisixRoute.yaml
index bccb647c..cf0f8e58 100644
--- a/samples/deploy/crd/v1/ApisixRoute.yaml
+++ b/samples/deploy/crd/v1/ApisixRoute.yaml
@@ -821,6 +821,22 @@ spec:
required:
- serviceName
- servicePort
+ plugins:
+ type: array
+ items:
+ type: object
+ properties:
+ name:
+ type: string
+ minLength: 1
+ enable:
+ type: boolean
+ config:
+ type: object
+ x-kubernetes-preserve-unknown-fields: true # we
have to enable it since plugin config
+ required:
+ - name
+ - enable
status:
type: object
properties:
diff --git a/test/e2e/go.mod b/test/e2e/go.mod
index 2c7fef4e..846ff781 100644
--- a/test/e2e/go.mod
+++ b/test/e2e/go.mod
@@ -5,6 +5,7 @@ go 1.18
require (
github.com/apache/apisix-ingress-controller
v0.0.0-20210105024109-72e53386de5a
github.com/apache/apisix-ingress-controller/test/e2e/testbackend v0.0.0
+ github.com/eclipse/paho.mqtt.golang v1.3.5
github.com/gavv/httpexpect/v2 v2.3.1
github.com/gorilla/websocket v1.5.0
github.com/gruntwork-io/terratest v0.40.19
diff --git a/test/e2e/go.sum b/test/e2e/go.sum
index deab836f..287a6e55 100644
--- a/test/e2e/go.sum
+++ b/test/e2e/go.sum
@@ -101,6 +101,11 @@ github.com/davecgh/go-spew v1.1.0/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod
h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
+github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
+github.com/dustin/go-humanize v1.0.0/go.mod
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
+github.com/eclipse/paho.mqtt.golang v1.3.5
h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
+github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod
h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
+github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod
h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod
h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/elazarl/goproxy v0.0.0-20190911111923-ecfe977594f1
h1:yY9rWGoXv1U5pl4gxqlULARMQD7x0QG85lqEXTWysik=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod
h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
@@ -574,6 +579,7 @@ golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod
h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index a8b97be9..b11a33d8 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -34,6 +34,7 @@ import (
"time"
"github.com/apache/apisix-ingress-controller/pkg/config"
+ mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gavv/httpexpect/v2"
"github.com/gruntwork-io/terratest/modules/k8s"
"github.com/gruntwork-io/terratest/modules/testing"
@@ -283,6 +284,13 @@ func (s *Scaffold) NewAPISIXClientWithTLSOverTCP(host
string) *httpexpect.Expect
})
}
+func (s *Scaffold) NewMQTTClient() mqtt.Client {
+ opts := mqtt.NewClientOptions()
+ opts.AddBroker(fmt.Sprintf("tcp://%s", s.apisixTCPTunnel.Endpoint()))
+ client := mqtt.NewClient(opts)
+ return client
+}
+
func (s *Scaffold) DNSResolver() *net.Resolver {
return &net.Resolver{
PreferGo: false,
diff --git a/test/e2e/suite-plugins/suite-plugins-other/mqtt-proxy.go
b/test/e2e/suite-plugins/suite-plugins-other/mqtt-proxy.go
new file mode 100644
index 00000000..b906a280
--- /dev/null
+++ b/test/e2e/suite-plugins/suite-plugins-other/mqtt-proxy.go
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package plugins
+
+import (
+ "time"
+
+ ginkgo "github.com/onsi/ginkgo/v2"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+)
+
+var _ = ginkgo.Describe("suite-plugins-other: mqtt-proxy plugin", func() {
+ opts := &scaffold.Options{
+ Name: "mqtt-proxy",
+ IngressAPISIXReplicas: 1,
+ ApisixResourceVersion: scaffold.ApisixResourceVersion().V2,
+ }
+ s := scaffold.NewScaffold(opts)
+ // setup mosquito service
+ ginkgo.It("stream mqtt proxy", func() {
+ assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(`
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+ name: mosquito
+spec:
+ replicas: 1
+ selector:
+ matchLabels:
+ app: mosquito
+ template:
+ metadata:
+ labels:
+ app: mosquito
+ spec:
+ containers:
+ - name: mosquito
+ image: eclipse-mosquitto:1.6
+ livenessProbe:
+ tcpSocket:
+ port: 1883
+ initialDelaySeconds: 5
+ periodSeconds: 10
+ readinessProbe:
+ tcpSocket:
+ port: 1883
+ initialDelaySeconds: 5
+ periodSeconds: 10
+ ports:
+ - name: mosquito
+ containerPort: 1883
+ protocol: TCP
+`))
+ assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(`
+apiVersion: v1
+kind: Service
+metadata:
+ name: mosquito
+spec:
+ selector:
+ app: mosquito
+ type: ClusterIP
+ ports:
+ - port: 1883
+ targetPort: 1883
+ protocol: TCP
+`))
+ s.EnsureNumEndpointsReady(ginkgo.GinkgoT(), "mosquito", 1)
+ // setup Apisix Route for mqtt proxy
+ apisixRoute := `
+apiVersion: apisix.apache.org/v2
+kind: ApisixRoute
+metadata:
+ name: mqtt-route
+spec:
+ stream:
+ - name: rule1
+ protocol: TCP
+ match:
+ ingressPort: 9100
+ backend:
+ serviceName: mosquito
+ servicePort: 1883
+ plugins:
+ - name: mqtt-proxy
+ enable: true
+ config:
+ protocol_name: MQTT
+ protocol_level: 4
+`
+
+ assert.Nil(ginkgo.GinkgoT(),
s.CreateVersionedApisixResource(apisixRoute))
+
+ err := s.EnsureNumApisixStreamRoutesCreated(1)
+ assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
+
+ sr, err := s.ListApisixStreamRoutes()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), sr, 1)
+ assert.Equal(ginkgo.GinkgoT(), sr[0].ServerPort, int32(9100))
+ // test mqtt protocol
+ c := s.NewMQTTClient()
+ token := c.Connect()
+ token.WaitTimeout(3 * time.Second)
+ assert.Nil(ginkgo.GinkgoT(), token.Error(), "Checking mqtt
connection")
+ })
+})