This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 1669bba  chore: namespaces filtering (#162)
1669bba is described below

commit 1669bba20ad775fd4ec162fb33fce87b407d4204
Author: Alex Zhang <[email protected]>
AuthorDate: Mon Jan 11 18:48:02 2021 +0800

    chore: namespaces filtering (#162)
    
    * chore: namespaces filtering
    
    * feat: run e2e cases in parallel
    
    * test: add e2e case to cover the namespacing filtering feature
    
    * test: always build apisix-ingress-controller from current branch before 
running e2e cases
    
    * fix: broken e2e test cases
---
 Makefile                                  | 15 ++++--
 cmd/ingress/ingress.go                    |  5 ++
 conf/config-default.yaml                  |  2 +
 pkg/config/config.go                      | 24 +++++++++-
 pkg/config/config_test.go                 |  1 +
 pkg/ingress/controller/apisix_route.go    | 16 ++++++-
 pkg/ingress/controller/apisix_service.go  | 16 ++++++-
 pkg/ingress/controller/apisix_tls.go      | 16 ++++++-
 pkg/ingress/controller/apisix_upstream.go | 13 +++++-
 pkg/ingress/controller/controller.go      | 66 ++++++++++++++++++++------
 pkg/ingress/controller/endpoint.go        | 10 +++-
 test/e2e/ingress/namespace.go             | 77 +++++++++++++++++++++++++++++++
 test/e2e/ingress/resourcepushing.go       | 38 +++++++--------
 test/e2e/scaffold/crd.go                  | 29 ++++++++++--
 test/e2e/scaffold/ingress.go              | 15 +++---
 test/e2e/scaffold/scaffold.go             | 22 ++++-----
 16 files changed, 296 insertions(+), 69 deletions(-)

diff --git a/Makefile b/Makefile
index 679cf14..efd28c7 100644
--- a/Makefile
+++ b/Makefile
@@ -17,7 +17,7 @@
 default: help
 
 VERSION ?= 0.1.0
-IMAGE_TAG ?= "latest"
+IMAGE_TAG ?= "dev"
 GITSHA ?= $(shell git rev-parse --short=7 HEAD)
 OSNAME ?= $(shell uname -s | tr A-Z a-z)
 OSARCH ?= $(shell uname -m | tr A-Z a-z)
@@ -40,7 +40,7 @@ build:
 
 ### build-image:      Build apisix-ingress-controller image
 build-image:
-       docker build -t apisix-ingress-controller:$(IMAGE_TAG) .
+       docker build -t apache/apisix-ingress-controller:$(IMAGE_TAG) .
 
 ### lint:             Do static lint check
 lint:
@@ -54,14 +54,21 @@ gofmt:
 unit-test:
        go test -cover -coverprofile=coverage.txt ./...
 
-### e2e-test:         Run e2e test cases
-e2e-test:
+### e2e-test:         Run e2e test cases (minikube is required)
+e2e-test: build-image-to-minikube
        export 
APISIX_ROUTE_DEF=$(PWD)/samples/deploy/crd/v1beta1/ApisixRoute.yaml && \
        export 
APISIX_UPSTREAM_DEF=$(PWD)/samples/deploy/crd/v1beta1/ApisixUpstream.yaml && \
        export 
APISIX_SERVICE_DEF=$(PWD)/samples/deploy/crd/v1beta1/ApisixService.yaml && \
        export APISIX_TLS_DEF=$(PWD)/samples/deploy/crd/v1beta1/ApisixTls.yaml 
&& \
        cd test/e2e && ginkgo -cover -coverprofile=coverage.txt -r 
--randomizeSuites --randomizeAllSpecs --trace -p --nodes=1
 
+# build images to minikube node directly, it's an internal directive, so don't
+# expose it's help message.
+build-image-to-minikube:
+       @minikube version > /dev/null 2>&1 || (echo "ERROR: minikube is 
required."; exit 1)
+       @eval $$(minikube docker-env);\
+       docker build -t apache/apisix-ingress-controller:$(IMAGE_TAG) .
+
 ### license-check:    Do Apache License Header check
 license-check:
 ifeq ("$(wildcard .actions/openwhisk-utilities/scancode/scanCode.py)", "")
diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go
index 9cac72f..66a7748 100644
--- a/cmd/ingress/ingress.go
+++ b/cmd/ingress/ingress.go
@@ -82,6 +82,10 @@ the apisix cluster and others are created`,
                                }
                                cfg = c
                        }
+                       if err := cfg.Validate(); err != nil {
+                               dief("bad configuration: %s", err)
+                       }
+
                        logger, err := log.NewLogger(
                                log.WithLogLevel(cfg.LogLevel),
                                log.WithOutputFile(cfg.LogOutput),
@@ -121,6 +125,7 @@ the apisix cluster and others are created`,
        cmd.PersistentFlags().BoolVar(&cfg.EnableProfiling, "enable-profiling", 
true, "enable profiling via web interface host:port/debug/pprof")
        cmd.PersistentFlags().StringVar(&cfg.Kubernetes.Kubeconfig, 
"kubeconfig", "", "Kubernetes configuration file (by default in-cluster 
configuration will be used)")
        
cmd.PersistentFlags().DurationVar(&cfg.Kubernetes.ResyncInterval.Duration, 
"resync-interval", time.Minute, "the controller resync (with Kubernetes) 
interval, the minimum resync interval is 30s")
+       cmd.PersistentFlags().StringSliceVar(&cfg.Kubernetes.AppNamespaces, 
"app-namespace", []string{config.NamespaceAll}, "namespaces that controller 
will watch for resources")
        cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", 
"", "the base URL for APISIX admin api / manager api")
        cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, 
"apisix-admin-key", "", "admin key used for the authorization of APISIX admin 
api / manager api")
 
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 4ae211d..5e34911 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -38,6 +38,8 @@ kubernetes:
   resync_interval: "6h" # how long should apisix-ingress-controller
                          # re-synchronizes with Kubernetes, default is 6h,
                          # and the minimal resync interval is 30s.
+  app_namespaces: ["*"]  # namespace list that controller will watch for 
resources,
+                         # by default all namespaces (represented by "*") are 
watched.
 
 # APISIX related configurations.
 apisix:
diff --git a/pkg/config/config.go b/pkg/config/config.go
index 75d00a4..4a00383 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -22,11 +22,15 @@ import (
        "time"
 
        "gopkg.in/yaml.v2"
+       v1 "k8s.io/api/core/v1"
 
        "github.com/api7/ingress-controller/pkg/types"
 )
 
 const (
+       // NamespaceAll represents all namespaces.
+       NamespaceAll = "*"
+
        _minimalResyncInterval = 30 * time.Second
 )
 
@@ -45,6 +49,7 @@ type Config struct {
 type KubernetesConfig struct {
        Kubeconfig     string             `json:"kubeconfig" yaml:"kubeconfig"`
        ResyncInterval types.TimeDuration `json:"resync_interval" 
yaml:"resync_interval"`
+       AppNamespaces  []string           `json:"app_namespaces" 
yaml:"app_namespaces"`
 }
 
 // APISIXConfig contains all APISIX related config items.
@@ -64,7 +69,8 @@ func NewDefaultConfig() *Config {
                EnableProfiling: true,
                Kubernetes: KubernetesConfig{
                        Kubeconfig:     "", // Use in-cluster configurations.
-                       ResyncInterval: types.TimeDuration{6 * time.Hour},
+                       ResyncInterval: types.TimeDuration{Duration: 6 * 
time.Hour},
+                       AppNamespaces:  []string{v1.NamespaceAll},
                },
        }
 }
@@ -99,5 +105,21 @@ func (cfg *Config) Validate() error {
        if cfg.APISIX.BaseURL == "" {
                return errors.New("apisix base url is required")
        }
+       cfg.Kubernetes.AppNamespaces = 
purifyAppNamespaces(cfg.Kubernetes.AppNamespaces)
        return nil
 }
+
+func purifyAppNamespaces(namespaces []string) []string {
+       exists := make(map[string]struct{})
+       var ultimate []string
+       for _, ns := range namespaces {
+               if ns == NamespaceAll {
+                       return []string{v1.NamespaceAll}
+               }
+               if _, ok := exists[ns]; !ok {
+                       ultimate = append(ultimate, ns)
+                       exists[ns] = struct{}{}
+               }
+       }
+       return ultimate
+}
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index 138e764..d220b38 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -35,6 +35,7 @@ func TestNewConfigFromFile(t *testing.T) {
                Kubernetes: KubernetesConfig{
                        ResyncInterval: types.TimeDuration{time.Hour},
                        Kubeconfig:     "/path/to/foo/baz",
+                       AppNamespaces:  []string{""},
                },
                APISIX: APISIXConfig{
                        BaseURL:  "http://127.0.0.1:8080/apisix";,
diff --git a/pkg/ingress/controller/apisix_route.go 
b/pkg/ingress/controller/apisix_route.go
index 3e033b9..c9938e7 100644
--- a/pkg/ingress/controller/apisix_route.go
+++ b/pkg/ingress/controller/apisix_route.go
@@ -22,7 +22,7 @@ import (
        clientSet 
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
        api6Scheme 
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned/scheme"
        api6Informers 
"github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions/config/v1"
-       "github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
+       v1 "github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
        "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/util/runtime"
        "k8s.io/apimachinery/pkg/util/wait"
@@ -37,6 +37,7 @@ import (
 )
 
 type ApisixRouteController struct {
+       controller           *Controller
        kubeclientset        kubernetes.Interface
        apisixRouteClientset clientSet.Interface
        apisixRouteList      v1.ApisixRouteLister
@@ -53,10 +54,12 @@ type RouteQueueObj struct {
 func BuildApisixRouteController(
        kubeclientset kubernetes.Interface,
        api6RouteClientset clientSet.Interface,
-       api6RouteInformer api6Informers.ApisixRouteInformer) 
*ApisixRouteController {
+       api6RouteInformer api6Informers.ApisixRouteInformer,
+       root *Controller) *ApisixRouteController {
 
        runtime.Must(api6Scheme.AddToScheme(scheme.Scheme))
        controller := &ApisixRouteController{
+               controller:           root,
                kubeclientset:        kubeclientset,
                apisixRouteClientset: api6RouteClientset,
                apisixRouteList:      api6RouteInformer.Lister(),
@@ -79,6 +82,9 @@ func (c *ApisixRouteController) addFunc(obj interface{}) {
                runtime.HandleError(err)
                return
        }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
        rqo := &RouteQueueObj{Key: key, OldObj: nil, Ope: ADD}
        c.workqueue.AddRateLimited(rqo)
 }
@@ -96,6 +102,9 @@ func (c *ApisixRouteController) updateFunc(oldObj, newObj 
interface{}) {
                runtime.HandleError(err)
                return
        }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
        rqo := &RouteQueueObj{Key: key, OldObj: oldRoute, Ope: UPDATE}
        c.workqueue.AddRateLimited(rqo)
 }
@@ -119,6 +128,9 @@ func (c *ApisixRouteController) deleteFunc(obj interface{}) 
{
                runtime.HandleError(err)
                return
        }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
        rqo := &RouteQueueObj{Key: key, OldObj: oldRoute, Ope: DELETE}
        c.workqueue.AddRateLimited(rqo)
 }
diff --git a/pkg/ingress/controller/apisix_service.go 
b/pkg/ingress/controller/apisix_service.go
index a3b18af..50fec6c 100644
--- a/pkg/ingress/controller/apisix_service.go
+++ b/pkg/ingress/controller/apisix_service.go
@@ -22,7 +22,7 @@ import (
        clientSet 
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
        apisixScheme 
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned/scheme"
        informers 
"github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions/config/v1"
-       "github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
+       v1 "github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
        "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/util/runtime"
        "k8s.io/apimachinery/pkg/util/wait"
@@ -37,6 +37,7 @@ import (
 )
 
 type ApisixServiceController struct {
+       controller          *Controller
        kubeclientset       kubernetes.Interface
        apisixClientset     clientSet.Interface
        apisixServiceList   v1.ApisixServiceLister
@@ -47,10 +48,12 @@ type ApisixServiceController struct {
 func BuildApisixServiceController(
        kubeclientset kubernetes.Interface,
        apisixServiceClientset clientSet.Interface,
-       apisixServiceInformer informers.ApisixServiceInformer) 
*ApisixServiceController {
+       apisixServiceInformer informers.ApisixServiceInformer,
+       root *Controller) *ApisixServiceController {
 
        runtime.Must(apisixScheme.AddToScheme(scheme.Scheme))
        controller := &ApisixServiceController{
+               controller:          root,
                kubeclientset:       kubeclientset,
                apisixClientset:     apisixServiceClientset,
                apisixServiceList:   apisixServiceInformer.Lister(),
@@ -159,6 +162,9 @@ func (c *ApisixServiceController) addFunc(obj interface{}) {
                runtime.HandleError(err)
                return
        }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
        sqo := &ServiceQueueObj{Key: key, OldObj: nil, Ope: ADD}
        c.workqueue.AddRateLimited(sqo)
 }
@@ -175,6 +181,9 @@ func (c *ApisixServiceController) updateFunc(oldObj, newObj 
interface{}) {
                runtime.HandleError(err)
                return
        }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
        sqo := &ServiceQueueObj{Key: key, OldObj: oldService, Ope: UPDATE}
        c.workqueue.AddRateLimited(sqo)
 }
@@ -199,6 +208,9 @@ func (c *ApisixServiceController) deleteFunc(obj 
interface{}) {
                runtime.HandleError(err)
                return
        }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
        sqo := &ServiceQueueObj{Key: key, OldObj: oldService, Ope: DELETE}
        c.workqueue.AddRateLimited(sqo)
 }
diff --git a/pkg/ingress/controller/apisix_tls.go 
b/pkg/ingress/controller/apisix_tls.go
index 0510d14..95f1fb4 100644
--- a/pkg/ingress/controller/apisix_tls.go
+++ b/pkg/ingress/controller/apisix_tls.go
@@ -22,7 +22,7 @@ import (
        clientSet 
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
        apisixScheme 
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned/scheme"
        informers 
"github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions/config/v1"
-       "github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
+       v1 "github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
        "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/util/runtime"
        "k8s.io/apimachinery/pkg/util/wait"
@@ -37,6 +37,7 @@ import (
 )
 
 type ApisixTlsController struct {
+       controller      *Controller
        kubeclientset   kubernetes.Interface
        apisixClientset clientSet.Interface
        apisixTlsList   v1.ApisixTlsLister
@@ -53,10 +54,12 @@ type TlsQueueObj struct {
 func BuildApisixTlsController(
        kubeclientset kubernetes.Interface,
        apisixTlsClientset clientSet.Interface,
-       apisixTlsInformer informers.ApisixTlsInformer) *ApisixTlsController {
+       apisixTlsInformer informers.ApisixTlsInformer,
+       root *Controller) *ApisixTlsController {
 
        runtime.Must(apisixScheme.AddToScheme(scheme.Scheme))
        controller := &ApisixTlsController{
+               controller:      root,
                kubeclientset:   kubeclientset,
                apisixClientset: apisixTlsClientset,
                apisixTlsList:   apisixTlsInformer.Lister(),
@@ -161,6 +164,9 @@ func (c *ApisixTlsController) addFunc(obj interface{}) {
                runtime.HandleError(err)
                return
        }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
        rqo := &TlsQueueObj{Key: key, OldObj: nil, Ope: state.Create}
        c.workqueue.AddRateLimited(rqo)
 }
@@ -177,6 +183,9 @@ func (c *ApisixTlsController) updateFunc(oldObj, newObj 
interface{}) {
                runtime.HandleError(err)
                return
        }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
        rqo := &TlsQueueObj{Key: key, OldObj: oldTls, Ope: state.Update}
        c.workqueue.AddRateLimited(rqo)
 }
@@ -200,6 +209,9 @@ func (c *ApisixTlsController) deleteFunc(obj interface{}) {
                runtime.HandleError(err)
                return
        }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
        rqo := &TlsQueueObj{Key: key, OldObj: oldTls, Ope: state.Delete}
        c.workqueue.AddRateLimited(rqo)
 }
diff --git a/pkg/ingress/controller/apisix_upstream.go 
b/pkg/ingress/controller/apisix_upstream.go
index 71e1ca9..2e43fe9 100644
--- a/pkg/ingress/controller/apisix_upstream.go
+++ b/pkg/ingress/controller/apisix_upstream.go
@@ -22,7 +22,7 @@ import (
        clientSet 
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
        apisixScheme 
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned/scheme"
        informers 
"github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions/config/v1"
-       "github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
+       v1 "github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
        "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/util/runtime"
        "k8s.io/apimachinery/pkg/util/wait"
@@ -38,6 +38,7 @@ import (
 )
 
 type ApisixUpstreamController struct {
+       controller           *Controller
        kubeclientset        kubernetes.Interface
        apisixClientset      clientSet.Interface
        apisixUpstreamList   v1.ApisixUpstreamLister
@@ -48,10 +49,12 @@ type ApisixUpstreamController struct {
 func BuildApisixUpstreamController(
        kubeclientset kubernetes.Interface,
        apisixUpstreamClientset clientSet.Interface,
-       apisixUpstreamInformer informers.ApisixUpstreamInformer) 
*ApisixUpstreamController {
+       apisixUpstreamInformer informers.ApisixUpstreamInformer,
+       root *Controller) *ApisixUpstreamController {
 
        runtime.Must(apisixScheme.AddToScheme(scheme.Scheme))
        controller := &ApisixUpstreamController{
+               controller:           root,
                kubeclientset:        kubeclientset,
                apisixClientset:      apisixUpstreamClientset,
                apisixUpstreamList:   apisixUpstreamInformer.Lister(),
@@ -161,6 +164,9 @@ func (c *ApisixUpstreamController) addFunc(obj interface{}) 
{
                runtime.HandleError(err)
                return
        }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
        sqo := &UpstreamQueueObj{Key: key, OldObj: nil, Ope: ADD}
        c.workqueue.AddRateLimited(sqo)
 }
@@ -202,6 +208,9 @@ func (c *ApisixUpstreamController) deleteFunc(obj 
interface{}) {
                runtime.HandleError(err)
                return
        }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
        sqo := &UpstreamQueueObj{Key: key, OldObj: oldUpstream, Ope: DELETE}
        c.workqueue.AddRateLimited(sqo)
 }
diff --git a/pkg/ingress/controller/controller.go 
b/pkg/ingress/controller/controller.go
index 7cf38ed..2114ec2 100644
--- a/pkg/ingress/controller/controller.go
+++ b/pkg/ingress/controller/controller.go
@@ -18,6 +18,10 @@ import (
        "os"
        "sync"
 
+       v1 "k8s.io/api/core/v1"
+
+       "k8s.io/client-go/tools/cache"
+
        "github.com/api7/ingress-controller/pkg/apisix"
 
        clientSet 
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
@@ -44,6 +48,7 @@ func recoverException() {
 // Controller is the ingress apisix controller object.
 type Controller struct {
        wg                 sync.WaitGroup
+       watchingNamespace  map[string]struct{}
        apiServer          *api.Server
        clientset          kubernetes.Interface
        crdClientset       crdclientset.Interface
@@ -82,12 +87,21 @@ func NewController(cfg *config.Config) (*Controller, error) 
{
        crdClientset := kube.GetApisixClient()
        sharedInformerFactory := 
externalversions.NewSharedInformerFactory(crdClientset, 
cfg.Kubernetes.ResyncInterval.Duration)
 
+       var watchingNamespace map[string]struct{}
+       if len(cfg.Kubernetes.AppNamespaces) > 1 || 
cfg.Kubernetes.AppNamespaces[0] != v1.NamespaceAll {
+               watchingNamespace = make(map[string]struct{}, 
len(cfg.Kubernetes.AppNamespaces))
+               for _, ns := range cfg.Kubernetes.AppNamespaces {
+                       watchingNamespace[ns] = struct{}{}
+               }
+       }
+
        c := &Controller{
                apiServer:          apiSrv,
                metricsCollector:   metrics.NewPrometheusCollector(podName, 
podNamespace),
                clientset:          kube.GetKubeClient(),
                crdClientset:       crdClientset,
                crdInformerFactory: sharedInformerFactory,
+               watchingNamespace:  watchingNamespace,
        }
 
        return c, nil
@@ -117,7 +131,7 @@ func (c *Controller) Run(stop chan struct{}) error {
        epInformer := ac.CoreSharedInformerFactory.Core().V1().Endpoints()
        kube.EndpointsInformer = epInformer
        // endpoint
-       ac.Endpoint()
+       ac.Endpoint(c)
        c.goAttach(func() {
                ac.CoreSharedInformerFactory.Start(stop)
        })
@@ -128,13 +142,13 @@ func (c *Controller) Run(stop chan struct{}) error {
        })
 
        // ApisixRoute
-       ac.ApisixRoute()
+       ac.ApisixRoute(c)
        // ApisixUpstream
-       ac.ApisixUpstream()
+       ac.ApisixUpstream(c)
        // ApisixService
-       ac.ApisixService()
+       ac.ApisixService(c)
        // ApisixTLS
-       ac.ApisixTLS()
+       ac.ApisixTLS(c)
 
        c.goAttach(func() {
                ac.SharedInformerFactory.Start(stop)
@@ -145,6 +159,24 @@ func (c *Controller) Run(stop chan struct{}) error {
        return nil
 }
 
+// namespaceWatching accepts a resource key, getting the namespace part
+// and checking whether the namespace is being watched.
+func (c *Controller) namespaceWatching(key string) (ok bool) {
+       if c.watchingNamespace == nil {
+               ok = true
+               return
+       }
+       ns, _, err := cache.SplitMetaNamespaceKey(key)
+       if err != nil {
+               // Ignore resource with invalid key.
+               ok = false
+               log.Warnf("resource %s was ignored since: %s", key, err)
+               return
+       }
+       _, ok = c.watchingNamespace[ns]
+       return
+}
+
 type Api6Controller struct {
        KubeClientSet             kubernetes.Interface
        Api6ClientSet             clientSet.Interface
@@ -153,40 +185,44 @@ type Api6Controller struct {
        Stop                      chan struct{}
 }
 
-func (api6 *Api6Controller) ApisixRoute() {
+func (api6 *Api6Controller) ApisixRoute(controller *Controller) {
        arc := BuildApisixRouteController(
                api6.KubeClientSet,
                api6.Api6ClientSet,
-               api6.SharedInformerFactory.Apisix().V1().ApisixRoutes())
+               api6.SharedInformerFactory.Apisix().V1().ApisixRoutes(),
+               controller)
        arc.Run(api6.Stop)
 }
 
-func (api6 *Api6Controller) ApisixUpstream() {
+func (api6 *Api6Controller) ApisixUpstream(controller *Controller) {
        auc := BuildApisixUpstreamController(
                api6.KubeClientSet,
                api6.Api6ClientSet,
-               api6.SharedInformerFactory.Apisix().V1().ApisixUpstreams())
+               api6.SharedInformerFactory.Apisix().V1().ApisixUpstreams(),
+               controller)
        auc.Run(api6.Stop)
 }
 
-func (api6 *Api6Controller) ApisixService() {
+func (api6 *Api6Controller) ApisixService(controller *Controller) {
        auc := BuildApisixServiceController(
                api6.KubeClientSet,
                api6.Api6ClientSet,
-               api6.SharedInformerFactory.Apisix().V1().ApisixServices())
+               api6.SharedInformerFactory.Apisix().V1().ApisixServices(),
+               controller)
        auc.Run(api6.Stop)
 }
 
-func (api6 *Api6Controller) ApisixTLS() {
+func (api6 *Api6Controller) ApisixTLS(controller *Controller) {
        auc := BuildApisixTlsController(
                api6.KubeClientSet,
                api6.Api6ClientSet,
-               api6.SharedInformerFactory.Apisix().V1().ApisixTlses())
+               api6.SharedInformerFactory.Apisix().V1().ApisixTlses(),
+               controller)
        auc.Run(api6.Stop)
 }
 
-func (api6 *Api6Controller) Endpoint() {
-       auc := BuildEndpointController(api6.KubeClientSet)
+func (api6 *Api6Controller) Endpoint(controller *Controller) {
+       auc := BuildEndpointController(api6.KubeClientSet, controller)
        //conf.EndpointsInformer)
        auc.Run(api6.Stop)
 }
diff --git a/pkg/ingress/controller/endpoint.go 
b/pkg/ingress/controller/endpoint.go
index 396e2a4..6c7a7e1 100644
--- a/pkg/ingress/controller/endpoint.go
+++ b/pkg/ingress/controller/endpoint.go
@@ -38,14 +38,16 @@ import (
 )
 
 type EndpointController struct {
+       controller     *Controller
        kubeclientset  kubernetes.Interface
        endpointList   CoreListerV1.EndpointsLister
        endpointSynced cache.InformerSynced
        workqueue      workqueue.RateLimitingInterface
 }
 
-func BuildEndpointController(kubeclientset kubernetes.Interface) 
*EndpointController {
+func BuildEndpointController(kubeclientset kubernetes.Interface, root 
*Controller) *EndpointController {
        controller := &EndpointController{
+               controller:     root,
                kubeclientset:  kubeclientset,
                endpointList:   kube.EndpointsInformer.Lister(),
                endpointSynced: kube.EndpointsInformer.Informer().HasSynced,
@@ -215,6 +217,9 @@ func (c *EndpointController) addFunc(obj interface{}) {
                runtime.HandleError(err)
                return
        }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
        c.workqueue.AddRateLimited(key)
 }
 
@@ -235,5 +240,8 @@ func (c *EndpointController) deleteFunc(obj interface{}) {
                runtime.HandleError(err)
                return
        }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
        c.workqueue.AddRateLimited(key)
 }
diff --git a/test/e2e/ingress/namespace.go b/test/e2e/ingress/namespace.go
new file mode 100644
index 0000000..23601ff
--- /dev/null
+++ b/test/e2e/ingress/namespace.go
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ingress
+
+import (
+       "encoding/json"
+       "fmt"
+       "net/http"
+
+       "github.com/api7/ingress-controller/test/e2e/scaffold"
+       "github.com/onsi/ginkgo"
+       "github.com/stretchr/testify/assert"
+)
+
+var _ = ginkgo.Describe("namespacing filtering", func() {
+       s := scaffold.NewDefaultScaffold()
+       ginkgo.It("resources in other namespaces should be ignored", func() {
+               backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+               route := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixRoute
+metadata:
+  name: httpbin-route
+spec:
+  rules:
+  - host: httpbin.com
+    http:
+      paths:
+      - backend:
+          serviceName: %s
+          servicePort: %d
+        path: /ip
+`, backendSvc, backendSvcPort[0])
+
+               assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(route), 
"creating ApisixRoute")
+               assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(1), 
"checking number of routes")
+               assert.Nil(ginkgo.GinkgoT(), 
s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams")
+
+               body := s.NewAPISIXClient().GET("/ip").WithHeader("Host", 
"httpbin.com").Expect().Status(http.StatusOK).Body().Raw()
+               var placeholder ip
+               err := json.Unmarshal([]byte(body), &placeholder)
+               assert.Nil(ginkgo.GinkgoT(), err, "unmarshalling IP")
+
+               // Now create another ApisixRoute in default namespace.
+               route = fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixRoute
+metadata:
+ name: httpbin-route
+spec:
+ rules:
+ - host: httpbin.com
+   http:
+     paths:
+     - backend:
+         serviceName: %s
+         servicePort: %d
+       path: /headers
+`, backendSvc, backendSvcPort[0])
+
+               assert.Nil(ginkgo.GinkgoT(), 
s.CreateResourceFromStringWithNamespace(route, "default"), "creating 
ApisixRoute")
+               _ = s.NewAPISIXClient().GET("/headers").WithHeader("Host", 
"httpbin.com").Expect().Status(http.StatusNotFound)
+       })
+})
diff --git a/test/e2e/ingress/resourcepushing.go 
b/test/e2e/ingress/resourcepushing.go
index 572e342..2ffba3f 100644
--- a/test/e2e/ingress/resourcepushing.go
+++ b/test/e2e/ingress/resourcepushing.go
@@ -15,6 +15,7 @@
 package ingress
 
 import (
+       "fmt"
        "time"
 
        "github.com/onsi/ginkgo"
@@ -26,7 +27,8 @@ import (
 var _ = ginkgo.Describe("ApisixRoute Testing", func() {
        s := scaffold.NewDefaultScaffold()
        ginkgo.It("create and then scale upstream pods to 2 ", func() {
-               apisixRoute := `
+               backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+               apisixRoute := fmt.Sprintf(`
 apiVersion: apisix.apache.org/v1
 kind: ApisixRoute
 metadata:
@@ -37,11 +39,11 @@ spec:
    http:
      paths:
      - backend:
-         serviceName: httpbin-service-e2e-test
-         servicePort: 80
+         serviceName: %s
+         servicePort: %d
        path: /ip
-`
-               s.CreateApisixRouteByString(apisixRoute)
+`, backendSvc, backendSvcPort[0])
+               assert.Nil(ginkgo.GinkgoT(), 
s.CreateResourceFromString(apisixRoute))
 
                err := s.EnsureNumApisixRoutesCreated(1)
                assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
@@ -50,14 +52,15 @@ spec:
                scale := 2
                err = s.ScaleHTTPBIN(scale)
                assert.Nil(ginkgo.GinkgoT(), err)
-               time.Sleep(10 * time.Second) // wait for ingress to sync
+               time.Sleep(5 * time.Second) // wait for ingress to sync
                ups, err := s.ListApisixUpstreams()
                assert.Nil(ginkgo.GinkgoT(), err, "list upstreams error")
                assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 2, "upstreams nodes 
not expect")
        })
 
-       ginkgo.It("create and then remove ", func() {
-               apisixRoute := `
+       ginkgo.It("create and then remove", func() {
+               backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+               apisixRoute := fmt.Sprintf(`
 apiVersion: apisix.apache.org/v1
 kind: ApisixRoute
 metadata:
@@ -68,25 +71,22 @@ spec:
     http:
       paths:
       - backend:
-          serviceName: httpbin-service-e2e-test
-          servicePort: 80
+          serviceName: %s
+          servicePort: %d
         path: /ip
-`
-               s.CreateApisixRouteByString(apisixRoute)
+`, backendSvc, backendSvcPort[0])
 
+               assert.Nil(ginkgo.GinkgoT(), 
s.CreateResourceFromString(apisixRoute), "creating ApisixRoute")
                err := s.EnsureNumApisixRoutesCreated(1)
                assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
                err = s.EnsureNumApisixUpstreamsCreated(1)
                assert.Nil(ginkgo.GinkgoT(), err, "Checking number of 
upstreams")
-               ups, err := s.ListApisixUpstreams()
-               assert.Nil(ginkgo.GinkgoT(), err, "list upstreams error")
-               assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 1, "upstreams nodes 
not expect")
 
                // remove
-               s.CreateApisixRouteByString(apisixRoute)
-               time.Sleep(5 * time.Second) // wait for ingress to sync
-               ups, err = s.ListApisixUpstreams()
+               assert.Nil(ginkgo.GinkgoT(), 
s.RemoveResourceByString(apisixRoute))
+               time.Sleep(10 * time.Second) // wait for ingress to sync
+               ups, err := s.ListApisixUpstreams()
                assert.Nil(ginkgo.GinkgoT(), err, "list upstreams error")
-               assert.Len(ginkgo.GinkgoT(), len(ups), 0, "upstreams nodes not 
expect")
+               assert.Len(ginkgo.GinkgoT(), ups, 0, "upstreams nodes not 
expect")
        })
 })
diff --git a/test/e2e/scaffold/crd.go b/test/e2e/scaffold/crd.go
index 29338ae..b955270 100644
--- a/test/e2e/scaffold/crd.go
+++ b/test/e2e/scaffold/crd.go
@@ -91,12 +91,33 @@ func (s *Scaffold) CreateApisixRoute(name string, rules 
[]ApisixRouteRule) {
        k8s.KubectlApplyFromString(s.t, s.kubectlOptions, string(data))
 }
 
-func (s *Scaffold) CreateApisixRouteByString(yaml string) {
-       k8s.KubectlApplyFromString(s.t, s.kubectlOptions, yaml)
+// CreateResourceFromString creates resource from a loaded yaml string.
+func (s *Scaffold) CreateResourceFromString(yaml string) error {
+       return k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, yaml)
+}
 
+// RemoveResourceByString remove resource from a loaded yaml string.
+func (s *Scaffold) RemoveResourceByString(yaml string) error {
+       return k8s.KubectlDeleteFromStringE(s.t, s.kubectlOptions, yaml)
 }
-func (s *Scaffold) RemoveApisixRouteByString(yaml string) {
-       k8s.KubectlDeleteFromString(s.t, s.kubectlOptions, yaml)
+
+// CreateResourceFromStringWithNamespace creates resource from a loaded yaml 
string
+// and sets its namespace to the sepcified one.
+func (s *Scaffold) CreateResourceFromStringWithNamespace(yaml, namespace 
string) error {
+       originalNamespace := s.kubectlOptions.Namespace
+       s.kubectlOptions.Namespace = namespace
+       defer func() {
+               s.kubectlOptions.Namespace = originalNamespace
+       }()
+       s.addFinializer(func() {
+               originalNamespace := s.kubectlOptions.Namespace
+               s.kubectlOptions.Namespace = namespace
+               defer func() {
+                       s.kubectlOptions.Namespace = originalNamespace
+               }()
+               assert.Nil(s.t, k8s.KubectlDeleteFromStringE(s.t, 
s.kubectlOptions, yaml))
+       })
+       return k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, yaml)
 }
 
 func ensureNumApisixCRDsCreated(url string, desired int) error {
diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go
index 361908b..9a6de56 100644
--- a/test/e2e/scaffold/ingress.go
+++ b/test/e2e/scaffold/ingress.go
@@ -29,7 +29,7 @@ const (
 apiVersion: rbac.authorization.k8s.io/v1
 kind: ClusterRoleBinding
 metadata:
-  name: ingress-apisix-e2e-test-clusterrolebinding
+  name: %s-clusterrolebinding
 roleRef:
   apiGroup: rbac.authorization.k8s.io
   kind: ClusterRole
@@ -39,7 +39,7 @@ subjects:
   name: ingress-apisix-e2e-test-service-account
   namespace: %s
 `
-       _ingressAPISIXDeployment = `
+       _ingressAPISIXDeploymentTemplate = `
 apiVersion: apps/v1
 kind: Deployment
 metadata:
@@ -51,7 +51,7 @@ spec:
       app: ingress-apisix-controller-deployment-e2e-test
   strategy:
     rollingUpdate:
-      maxSurge: 50%
+      maxSurge: 50%%
       maxUnavailable: 1
     type: RollingUpdate
   template:
@@ -78,7 +78,7 @@ spec:
               port: 8080
             timeoutSeconds: 2
           image: "apache/apisix-ingress-controller:dev"
-          imagePullPolicy: IfNotPresent
+          imagePullPolicy: Never
           name: ingress-apisix-controller-deployment-e2e-test
           ports:
             - containerPort: 8080
@@ -95,20 +95,23 @@ spec:
             - :8080
             - --apisix-base-url
             - http://apisix-service-e2e-test:9180/apisix/admin
+            - --app-namespace
+            - %s
       serviceAccount: ingress-apisix-e2e-test-service-account
 `
 )
 
 func (s *Scaffold) newIngressAPISIXController() error {
+       ingressAPISIXDeployment := 
fmt.Sprintf(_ingressAPISIXDeploymentTemplate, s.namespace)
        if err := k8s.CreateServiceAccountE(s.t, s.kubectlOptions, 
_serviceAccount); err != nil {
                return err
        }
 
-       crb := fmt.Sprintf(_clusterRoleBinding, s.namespace)
+       crb := fmt.Sprintf(_clusterRoleBinding, s.namespace, s.namespace)
        if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, crb); err 
!= nil {
                return err
        }
-       if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, 
_ingressAPISIXDeployment); err != nil {
+       if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, 
ingressAPISIXDeployment); err != nil {
                return err
        }
        return nil
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index 6a5a711..fdd1998 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -53,6 +53,7 @@ type Scaffold struct {
        apisixService     *corev1.Service
        httpbinDeployment *appsv1.Deployment
        httpbinService    *corev1.Service
+       finializers       []func()
 
        // Used for template rendering.
        EtcdServiceFQDN string
@@ -133,14 +134,6 @@ func (s *Scaffold) NewAPISIXClient() *httpexpect.Expect {
        })
 }
 
-func (s *Scaffold) BeforeEach() {
-       s.beforeEach()
-}
-
-func (s *Scaffold) AfterEach() {
-       s.afterEach()
-}
-
 func (s *Scaffold) beforeEach() {
        var err error
        s.namespace = fmt.Sprintf("ingress-apisix-e2e-tests-%s-%d", 
s.opts.Name, time.Now().Nanosecond())
@@ -157,7 +150,6 @@ func (s *Scaffold) beforeEach() {
        assert.Nil(s.t, err, "initializing etcd")
 
        // We don't use k8s.WaitUntilServiceAvailable since it hacks for 
Minikube.
-       //err = s.waitAllEtcdPodsAvailable()
        err = k8s.WaitUntilNumPodsCreatedE(s.t, s.kubectlOptions, 
s.labelSelector("app=etcd-deployment-e2e-test"), 1, 5, 2*time.Second)
        assert.Nil(s.t, err, "waiting for etcd ready")
 
@@ -182,8 +174,16 @@ func (s *Scaffold) beforeEach() {
 
 func (s *Scaffold) afterEach() {
        defer ginkgo.GinkgoRecover()
-       //err := k8s.DeleteNamespaceE(s.t, s.kubectlOptions, s.namespace)
-       //assert.Nilf(ginkgo.GinkgoT(), err, "deleting namespace %s", 
s.namespace)
+       err := k8s.DeleteNamespaceE(s.t, s.kubectlOptions, s.namespace)
+       assert.Nilf(ginkgo.GinkgoT(), err, "deleting namespace %s", s.namespace)
+
+       for _, f := range s.finializers {
+               f()
+       }
+}
+
+func (s *Scaffold) addFinializer(f func()) {
+       s.finializers = append(s.finializers, f)
 }
 
 func (s *Scaffold) renderConfig(path string) (string, error) {

Reply via email to