This is an automated email from the ASF dual-hosted git repository.

zhangjintao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 530ce52b feat: add mqtt-proxy plugin in ApisixRoute (#1056)
530ce52b is described below

commit 530ce52b278d7e385db3747d33e0d9fe1db0f3d1
Author: lsy <[email protected]>
AuthorDate: Sun Aug 21 22:20:15 2022 +0800

    feat: add mqtt-proxy plugin in ApisixRoute (#1056)
---
 .github/workflows/spell-checker.yml                |   2 +-
 docs/en/latest/references/apisix_route_v2.md       |   4 +
 pkg/api/validation/apisix_route_test.go            |   2 +-
 pkg/kube/apisix/apis/config/v2/types.go            |  23 ++--
 .../apisix/apis/config/v2/zz_generated.deepcopy.go |  49 +++++----
 pkg/providers/apisix/translation/apisix_route.go   |  15 +++
 pkg/types/apisix/v1/types.go                       |   1 +
 pkg/types/apisix/v1/zz_generated.deepcopy.go       |   1 +
 samples/deploy/crd/v1/ApisixRoute.yaml             |  16 +++
 test/e2e/go.mod                                    |   1 +
 test/e2e/go.sum                                    |   6 +
 test/e2e/scaffold/scaffold.go                      |   8 ++
 .../suite-plugins-other/mqtt-proxy.go              | 122 +++++++++++++++++++++
 13 files changed, 217 insertions(+), 33 deletions(-)

diff --git a/.github/workflows/spell-checker.yml 
b/.github/workflows/spell-checker.yml
index ef1d159a..074f3bda 100644
--- a/.github/workflows/spell-checker.yml
+++ b/.github/workflows/spell-checker.yml
@@ -35,5 +35,5 @@ jobs:
           wget -O - -q https://git.io/misspell | sh -s -- -b .
       - name: Misspell
         run: |
-          find . -name "*.go" -type f | xargs ./misspell -error
+          find . -name "*.go" -type f | xargs ./misspell -i mosquitto -error
           find docs -type f | xargs ./misspell -error
diff --git a/docs/en/latest/references/apisix_route_v2.md 
b/docs/en/latest/references/apisix_route_v2.md
index 341b9cbe..6deb9c23 100644
--- a/docs/en/latest/references/apisix_route_v2.md
+++ b/docs/en/latest/references/apisix_route_v2.md
@@ -73,6 +73,10 @@ Meaning of each field in the spec of ApisixRoute are 
followed, the top level fie
 | stream[].backend.servicePort         | integer or string  | The backend 
service port, can be the port number or the name defined in the service object. 
                                                                                
                                                      |
 | stream[].backend.resolveGranularity  | string             | See [Service 
Resolve Granularity](#service-resolve-granularity) for the details.             
                                                                                
                                                     |
 | stream[].backend.subset              | string             | Subset specifies 
a subset for the target Service. The subset should be pre-definedin 
ApisixUpstream about this service.                                              
                                                             |
+| stream[].plugins                     | array              | A series of 
APISIX plugins that will be executed once this route rule is matched            
                                                                                
                                                      |
+| stream[].plugins[].name              | string             | The plugin name, 
see [docs](http://apisix.apache.org/docs/apisix/getting-started) for learning 
the available plugins.                                                          
                                                   |
+| stream[].plugins[].enable            | boolean            | Whether the 
plugin would be used                                                            
                                                                                
                                                      |
+| stream[].plugins[].config            | object             | The 
configuration of the plugin that must have the same fields as in APISIX.        
                                                                                
                                                              |
 
 ## Expression Operators
 
diff --git a/pkg/api/validation/apisix_route_test.go 
b/pkg/api/validation/apisix_route_test.go
index 7249aa93..f51ba0a7 100644
--- a/pkg/api/validation/apisix_route_test.go
+++ b/pkg/api/validation/apisix_route_test.go
@@ -124,7 +124,7 @@ func Test_validatePlugin(t *testing.T) {
        fakeClient := newFakeSchemaClient()
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       gotValid, _ := validatePlugin(fakeClient, 
tt.pluginName, v2.ApisixRouteHTTPPluginConfig(tt.pluginConfig))
+                       gotValid, _ := validatePlugin(fakeClient, 
tt.pluginName, v2.ApisixRoutePluginConfig(tt.pluginConfig))
                        if gotValid != tt.wantValid {
                                t.Errorf("validatePlugin() gotValid = %v, want 
%v", gotValid, tt.wantValid)
                        }
diff --git a/pkg/kube/apisix/apis/config/v2/types.go 
b/pkg/kube/apisix/apis/config/v2/types.go
index 6adc5c09..666d7b6a 100644
--- a/pkg/kube/apisix/apis/config/v2/types.go
+++ b/pkg/kube/apisix/apis/config/v2/types.go
@@ -70,7 +70,7 @@ type ApisixRouteHTTP struct {
        Backends         []ApisixRouteHTTPBackend  `json:"backends,omitempty" 
yaml:"backends,omitempty"`
        Websocket        bool                      `json:"websocket" 
yaml:"websocket"`
        PluginConfigName string                    
`json:"plugin_config_name,omitempty" yaml:"plugin_config_name,omitempty"`
-       Plugins          []ApisixRouteHTTPPlugin   `json:"plugins,omitempty" 
yaml:"plugins,omitempty"`
+       Plugins          []ApisixRoutePlugin       `json:"plugins,omitempty" 
yaml:"plugins,omitempty"`
        Authentication   ApisixRouteAuthentication 
`json:"authentication,omitempty" yaml:"authentication,omitempty"`
 }
 
@@ -152,19 +152,19 @@ type ApisixRouteHTTPMatchExprSubject struct {
        Name string `json:"name" yaml:"name"`
 }
 
-// ApisixRouteHTTPPlugin represents an APISIX plugin.
-type ApisixRouteHTTPPlugin struct {
+// ApisixRoutePlugin represents an APISIX plugin.
+type ApisixRoutePlugin struct {
        // The plugin name.
        Name string `json:"name" yaml:"name"`
        // Whether this plugin is in use, default is true.
        Enable bool `json:"enable" yaml:"enable"`
        // Plugin configuration.
-       Config ApisixRouteHTTPPluginConfig `json:"config" yaml:"config"`
+       Config ApisixRoutePluginConfig `json:"config" yaml:"config"`
 }
 
-// ApisixRouteHTTPPluginConfig is the configuration for
+// ApisixRoutePluginConfig is the configuration for
 // any plugins.
-type ApisixRouteHTTPPluginConfig map[string]interface{}
+type ApisixRoutePluginConfig map[string]interface{}
 
 // ApisixRouteAuthentication is the authentication-related
 // configuration in ApisixRoute.
@@ -189,16 +189,16 @@ type ApisixRouteAuthenticationJwtAuth struct {
        Cookie string `json:"cookie,omitempty" yaml:"cookie,omitempty"`
 }
 
-func (p ApisixRouteHTTPPluginConfig) DeepCopyInto(out 
*ApisixRouteHTTPPluginConfig) {
+func (p ApisixRoutePluginConfig) DeepCopyInto(out *ApisixRoutePluginConfig) {
        b, _ := json.Marshal(&p)
        _ = json.Unmarshal(b, out)
 }
 
-func (p *ApisixRouteHTTPPluginConfig) DeepCopy() *ApisixRouteHTTPPluginConfig {
+func (p *ApisixRoutePluginConfig) DeepCopy() *ApisixRoutePluginConfig {
        if p == nil {
                return nil
        }
-       out := new(ApisixRouteHTTPPluginConfig)
+       out := new(ApisixRoutePluginConfig)
        p.DeepCopyInto(out)
        return out
 }
@@ -210,6 +210,7 @@ type ApisixRouteStream struct {
        Protocol string                   `json:"protocol" yaml:"protocol"`
        Match    ApisixRouteStreamMatch   `json:"match" yaml:"match"`
        Backend  ApisixRouteStreamBackend `json:"backend" yaml:"backend"`
+       Plugins  []ApisixRoutePlugin      `json:"plugins,omitempty" 
yaml:"plugins,omitempty"`
 }
 
 // ApisixRouteStreamMatch represents the match conditions of stream route.
@@ -691,9 +692,9 @@ type ApisixPluginConfig struct {
 
 // ApisixPluginConfigSpec defines the desired state of ApisixPluginConfigSpec.
 type ApisixPluginConfigSpec struct {
-       // Plugins contains a list of ApisixRouteHTTPPlugin
+       // Plugins contains a list of ApisixRoutePlugin
        // +required
-       Plugins []ApisixRouteHTTPPlugin `json:"plugins" yaml:"plugins"`
+       Plugins []ApisixRoutePlugin `json:"plugins" yaml:"plugins"`
 }
 
 // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
diff --git a/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go 
b/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
index 1b94ae88..63c3ea1c 100644
--- a/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
+++ b/pkg/kube/apisix/apis/config/v2/zz_generated.deepcopy.go
@@ -669,7 +669,7 @@ func (in *ApisixPluginConfigSpec) DeepCopyInto(out 
*ApisixPluginConfigSpec) {
        *out = *in
        if in.Plugins != nil {
                in, out := &in.Plugins, &out.Plugins
-               *out = make([]ApisixRouteHTTPPlugin, len(*in))
+               *out = make([]ApisixRoutePlugin, len(*in))
                for i := range *in {
                        (*in)[i].DeepCopyInto(&(*out)[i])
                }
@@ -783,7 +783,7 @@ func (in *ApisixRouteHTTP) DeepCopyInto(out 
*ApisixRouteHTTP) {
        }
        if in.Plugins != nil {
                in, out := &in.Plugins, &out.Plugins
-               *out = make([]ApisixRouteHTTPPlugin, len(*in))
+               *out = make([]ApisixRoutePlugin, len(*in))
                for i := range *in {
                        (*in)[i].DeepCopyInto(&(*out)[i])
                }
@@ -910,23 +910,6 @@ func (in *ApisixRouteHTTPMatchExprSubject) DeepCopy() 
*ApisixRouteHTTPMatchExprS
        return out
 }
 
-// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
-func (in *ApisixRouteHTTPPlugin) DeepCopyInto(out *ApisixRouteHTTPPlugin) {
-       *out = *in
-       in.Config.DeepCopyInto(&out.Config)
-       return
-}
-
-// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new ApisixRouteHTTPPlugin.
-func (in *ApisixRouteHTTPPlugin) DeepCopy() *ApisixRouteHTTPPlugin {
-       if in == nil {
-               return nil
-       }
-       out := new(ApisixRouteHTTPPlugin)
-       in.DeepCopyInto(out)
-       return out
-}
-
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *ApisixRouteList) DeepCopyInto(out *ApisixRouteList) {
        *out = *in
@@ -960,6 +943,23 @@ func (in *ApisixRouteList) DeepCopyObject() runtime.Object 
{
        return nil
 }
 
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *ApisixRoutePlugin) DeepCopyInto(out *ApisixRoutePlugin) {
+       *out = *in
+       in.Config.DeepCopyInto(&out.Config)
+       return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new ApisixRoutePlugin.
+func (in *ApisixRoutePlugin) DeepCopy() *ApisixRoutePlugin {
+       if in == nil {
+               return nil
+       }
+       out := new(ApisixRoutePlugin)
+       in.DeepCopyInto(out)
+       return out
+}
+
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *ApisixRouteSpec) DeepCopyInto(out *ApisixRouteSpec) {
        *out = *in
@@ -973,7 +973,9 @@ func (in *ApisixRouteSpec) DeepCopyInto(out 
*ApisixRouteSpec) {
        if in.Stream != nil {
                in, out := &in.Stream, &out.Stream
                *out = make([]ApisixRouteStream, len(*in))
-               copy(*out, *in)
+               for i := range *in {
+                       (*in)[i].DeepCopyInto(&(*out)[i])
+               }
        }
        return
 }
@@ -993,6 +995,13 @@ func (in *ApisixRouteStream) DeepCopyInto(out 
*ApisixRouteStream) {
        *out = *in
        out.Match = in.Match
        out.Backend = in.Backend
+       if in.Plugins != nil {
+               in, out := &in.Plugins, &out.Plugins
+               *out = make([]ApisixRoutePlugin, len(*in))
+               for i := range *in {
+                       (*in)[i].DeepCopyInto(&(*out)[i])
+               }
+       }
        return
 }
 
diff --git a/pkg/providers/apisix/translation/apisix_route.go 
b/pkg/providers/apisix/translation/apisix_route.go
index bf32f733..f91ea128 100644
--- a/pkg/providers/apisix/translation/apisix_route.go
+++ b/pkg/providers/apisix/translation/apisix_route.go
@@ -818,6 +818,20 @@ func (t *translator) translateStreamRouteV2(ctx 
*translation.TranslateContext, a
                        )
                        return err
                }
+
+               // add stream route plugins
+               pluginMap := make(apisixv1.Plugins)
+               for _, plugin := range part.Plugins {
+                       if !plugin.Enable {
+                               continue
+                       }
+                       if plugin.Config != nil {
+                               pluginMap[plugin.Name] = plugin.Config
+                       } else {
+                               pluginMap[plugin.Name] = 
make(map[string]interface{})
+                       }
+               }
+
                sr := apisixv1.NewDefaultStreamRoute()
                name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, 
part.Name)
                sr.ID = id.GenID(name)
@@ -827,6 +841,7 @@ func (t *translator) translateStreamRouteV2(ctx 
*translation.TranslateContext, a
                        return err
                }
                sr.UpstreamId = ups.ID
+               sr.Plugins = pluginMap
                ctx.AddStreamRoute(sr)
                if !ctx.CheckUpstreamExist(ups.Name) {
                        ctx.AddUpstream(ups)
diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go
index 8ffcd0d3..ea326adf 100644
--- a/pkg/types/apisix/v1/types.go
+++ b/pkg/types/apisix/v1/types.go
@@ -372,6 +372,7 @@ type StreamRoute struct {
        SNI        string            `json:"sni,omitempty" yaml:"sni,omitempty"`
        UpstreamId string            `json:"upstream_id,omitempty" 
yaml:"upstream_id,omitempty"`
        Upstream   *Upstream         `json:"upstream,omitempty" 
yaml:"upstream,omitempty"`
+       Plugins    Plugins           `json:"plugins,omitempty" 
yaml:"plugins,omitempty"`
 }
 
 // GlobalRule represents the global_rule object in APISIX.
diff --git a/pkg/types/apisix/v1/zz_generated.deepcopy.go 
b/pkg/types/apisix/v1/zz_generated.deepcopy.go
index 8da11e6f..a353718b 100644
--- a/pkg/types/apisix/v1/zz_generated.deepcopy.go
+++ b/pkg/types/apisix/v1/zz_generated.deepcopy.go
@@ -465,6 +465,7 @@ func (in *StreamRoute) DeepCopyInto(out *StreamRoute) {
                *out = new(Upstream)
                (*in).DeepCopyInto(*out)
        }
+       in.Plugins.DeepCopyInto(&out.Plugins)
        return
 }
 
diff --git a/samples/deploy/crd/v1/ApisixRoute.yaml 
b/samples/deploy/crd/v1/ApisixRoute.yaml
index bccb647c..cf0f8e58 100644
--- a/samples/deploy/crd/v1/ApisixRoute.yaml
+++ b/samples/deploy/crd/v1/ApisixRoute.yaml
@@ -821,6 +821,22 @@ spec:
                         required:
                           - serviceName
                           - servicePort
+                      plugins:
+                        type: array
+                        items:
+                          type: object
+                          properties:
+                            name:
+                              type: string
+                              minLength: 1
+                            enable:
+                              type: boolean
+                            config:
+                              type: object
+                              x-kubernetes-preserve-unknown-fields: true # we 
have to enable it since plugin config
+                        required:
+                          - name
+                          - enable
             status:
               type: object
               properties:
diff --git a/test/e2e/go.mod b/test/e2e/go.mod
index 2c7fef4e..846ff781 100644
--- a/test/e2e/go.mod
+++ b/test/e2e/go.mod
@@ -5,6 +5,7 @@ go 1.18
 require (
        github.com/apache/apisix-ingress-controller 
v0.0.0-20210105024109-72e53386de5a
        github.com/apache/apisix-ingress-controller/test/e2e/testbackend v0.0.0
+       github.com/eclipse/paho.mqtt.golang v1.3.5
        github.com/gavv/httpexpect/v2 v2.3.1
        github.com/gorilla/websocket v1.5.0
        github.com/gruntwork-io/terratest v0.40.19
diff --git a/test/e2e/go.sum b/test/e2e/go.sum
index deab836f..287a6e55 100644
--- a/test/e2e/go.sum
+++ b/test/e2e/go.sum
@@ -101,6 +101,11 @@ github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod 
h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
+github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod 
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
+github.com/dustin/go-humanize v1.0.0/go.mod 
h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
+github.com/eclipse/paho.mqtt.golang v1.3.5 
h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
+github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod 
h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
+github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod 
h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
 github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod 
h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
 github.com/elazarl/goproxy v0.0.0-20190911111923-ecfe977594f1 
h1:yY9rWGoXv1U5pl4gxqlULARMQD7x0QG85lqEXTWysik=
 github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod 
h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
@@ -574,6 +579,7 @@ golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod 
h1:z5CRVTTTmAJ677TzLL
 golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod 
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod 
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod 
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod 
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20200513185701-a91f0712d120/go.mod 
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index a8b97be9..b11a33d8 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -34,6 +34,7 @@ import (
        "time"
 
        "github.com/apache/apisix-ingress-controller/pkg/config"
+       mqtt "github.com/eclipse/paho.mqtt.golang"
        "github.com/gavv/httpexpect/v2"
        "github.com/gruntwork-io/terratest/modules/k8s"
        "github.com/gruntwork-io/terratest/modules/testing"
@@ -283,6 +284,13 @@ func (s *Scaffold) NewAPISIXClientWithTLSOverTCP(host 
string) *httpexpect.Expect
        })
 }
 
+func (s *Scaffold) NewMQTTClient() mqtt.Client {
+       opts := mqtt.NewClientOptions()
+       opts.AddBroker(fmt.Sprintf("tcp://%s", s.apisixTCPTunnel.Endpoint()))
+       client := mqtt.NewClient(opts)
+       return client
+}
+
 func (s *Scaffold) DNSResolver() *net.Resolver {
        return &net.Resolver{
                PreferGo: false,
diff --git a/test/e2e/suite-plugins/suite-plugins-other/mqtt-proxy.go 
b/test/e2e/suite-plugins/suite-plugins-other/mqtt-proxy.go
new file mode 100644
index 00000000..b906a280
--- /dev/null
+++ b/test/e2e/suite-plugins/suite-plugins-other/mqtt-proxy.go
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package plugins
+
+import (
+       "time"
+
+       ginkgo "github.com/onsi/ginkgo/v2"
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+)
+
+var _ = ginkgo.Describe("suite-plugins-other: mqtt-proxy plugin", func() {
+       opts := &scaffold.Options{
+               Name:                  "mqtt-proxy",
+               IngressAPISIXReplicas: 1,
+               ApisixResourceVersion: scaffold.ApisixResourceVersion().V2,
+       }
+       s := scaffold.NewScaffold(opts)
+       // setup mosquito service
+       ginkgo.It("stream mqtt proxy", func() {
+               assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(`
+apiVersion: apps/v1
+kind: Deployment
+metadata:
+  name: mosquito
+spec:
+  replicas: 1
+  selector:
+    matchLabels:
+      app: mosquito
+  template:
+    metadata:
+      labels:
+        app: mosquito
+    spec:
+      containers:
+      - name: mosquito
+        image: eclipse-mosquitto:1.6
+        livenessProbe:
+          tcpSocket:
+            port: 1883
+          initialDelaySeconds: 5
+          periodSeconds: 10
+        readinessProbe:
+          tcpSocket:
+            port: 1883
+          initialDelaySeconds: 5
+          periodSeconds: 10
+        ports:
+        - name: mosquito
+          containerPort: 1883
+          protocol: TCP
+`))
+               assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(`
+apiVersion: v1
+kind: Service
+metadata:
+  name: mosquito
+spec:
+  selector:
+    app: mosquito
+  type: ClusterIP
+  ports:
+  - port: 1883
+    targetPort: 1883
+    protocol: TCP
+`))
+               s.EnsureNumEndpointsReady(ginkgo.GinkgoT(), "mosquito", 1)
+               // setup Apisix Route for mqtt proxy
+               apisixRoute := `
+apiVersion: apisix.apache.org/v2
+kind: ApisixRoute
+metadata:
+  name: mqtt-route
+spec:
+  stream:
+  - name: rule1
+    protocol: TCP
+    match:
+      ingressPort: 9100
+    backend:
+      serviceName: mosquito
+      servicePort: 1883
+    plugins:
+    - name: mqtt-proxy
+      enable: true
+      config:
+        protocol_name: MQTT
+        protocol_level: 4
+`
+
+               assert.Nil(ginkgo.GinkgoT(), 
s.CreateVersionedApisixResource(apisixRoute))
+
+               err := s.EnsureNumApisixStreamRoutesCreated(1)
+               assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
+
+               sr, err := s.ListApisixStreamRoutes()
+               assert.Nil(ginkgo.GinkgoT(), err)
+               assert.Len(ginkgo.GinkgoT(), sr, 1)
+               assert.Equal(ginkgo.GinkgoT(), sr[0].ServerPort, int32(9100))
+               // test mqtt protocol
+               c := s.NewMQTTClient()
+               token := c.Connect()
+               token.WaitTimeout(3 * time.Second)
+               assert.Nil(ginkgo.GinkgoT(), token.Error(), "Checking mqtt 
connection")
+       })
+})

Reply via email to