This is an automated email from the ASF dual-hosted git repository.
kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 1669bba chore: namespaces filtering (#162)
1669bba is described below
commit 1669bba20ad775fd4ec162fb33fce87b407d4204
Author: Alex Zhang <[email protected]>
AuthorDate: Mon Jan 11 18:48:02 2021 +0800
chore: namespaces filtering (#162)
* chore: namespaces filtering
* feat: run e2e cases in parallel
* test: add e2e case to cover the namespacing filtering feature
* test: always build apisix-ingress-controller from current branch before
running e2e cases
* fix: broken e2e test cases
---
Makefile | 15 ++++--
cmd/ingress/ingress.go | 5 ++
conf/config-default.yaml | 2 +
pkg/config/config.go | 24 +++++++++-
pkg/config/config_test.go | 1 +
pkg/ingress/controller/apisix_route.go | 16 ++++++-
pkg/ingress/controller/apisix_service.go | 16 ++++++-
pkg/ingress/controller/apisix_tls.go | 16 ++++++-
pkg/ingress/controller/apisix_upstream.go | 13 +++++-
pkg/ingress/controller/controller.go | 66 ++++++++++++++++++++------
pkg/ingress/controller/endpoint.go | 10 +++-
test/e2e/ingress/namespace.go | 77 +++++++++++++++++++++++++++++++
test/e2e/ingress/resourcepushing.go | 38 +++++++--------
test/e2e/scaffold/crd.go | 29 ++++++++++--
test/e2e/scaffold/ingress.go | 15 +++---
test/e2e/scaffold/scaffold.go | 22 ++++-----
16 files changed, 296 insertions(+), 69 deletions(-)
diff --git a/Makefile b/Makefile
index 679cf14..efd28c7 100644
--- a/Makefile
+++ b/Makefile
@@ -17,7 +17,7 @@
default: help
VERSION ?= 0.1.0
-IMAGE_TAG ?= "latest"
+IMAGE_TAG ?= "dev"
GITSHA ?= $(shell git rev-parse --short=7 HEAD)
OSNAME ?= $(shell uname -s | tr A-Z a-z)
OSARCH ?= $(shell uname -m | tr A-Z a-z)
@@ -40,7 +40,7 @@ build:
### build-image: Build apisix-ingress-controller image
build-image:
- docker build -t apisix-ingress-controller:$(IMAGE_TAG) .
+ docker build -t apache/apisix-ingress-controller:$(IMAGE_TAG) .
### lint: Do static lint check
lint:
@@ -54,14 +54,21 @@ gofmt:
unit-test:
go test -cover -coverprofile=coverage.txt ./...
-### e2e-test: Run e2e test cases
-e2e-test:
+### e2e-test: Run e2e test cases (minikube is required)
+e2e-test: build-image-to-minikube
export
APISIX_ROUTE_DEF=$(PWD)/samples/deploy/crd/v1beta1/ApisixRoute.yaml && \
export
APISIX_UPSTREAM_DEF=$(PWD)/samples/deploy/crd/v1beta1/ApisixUpstream.yaml && \
export
APISIX_SERVICE_DEF=$(PWD)/samples/deploy/crd/v1beta1/ApisixService.yaml && \
export APISIX_TLS_DEF=$(PWD)/samples/deploy/crd/v1beta1/ApisixTls.yaml
&& \
cd test/e2e && ginkgo -cover -coverprofile=coverage.txt -r
--randomizeSuites --randomizeAllSpecs --trace -p --nodes=1
+# build images to minikube node directly, it's an internal directive, so don't
+# expose it's help message.
+build-image-to-minikube:
+ @minikube version > /dev/null 2>&1 || (echo "ERROR: minikube is
required."; exit 1)
+ @eval $$(minikube docker-env);\
+ docker build -t apache/apisix-ingress-controller:$(IMAGE_TAG) .
+
### license-check: Do Apache License Header check
license-check:
ifeq ("$(wildcard .actions/openwhisk-utilities/scancode/scanCode.py)", "")
diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go
index 9cac72f..66a7748 100644
--- a/cmd/ingress/ingress.go
+++ b/cmd/ingress/ingress.go
@@ -82,6 +82,10 @@ the apisix cluster and others are created`,
}
cfg = c
}
+ if err := cfg.Validate(); err != nil {
+ dief("bad configuration: %s", err)
+ }
+
logger, err := log.NewLogger(
log.WithLogLevel(cfg.LogLevel),
log.WithOutputFile(cfg.LogOutput),
@@ -121,6 +125,7 @@ the apisix cluster and others are created`,
cmd.PersistentFlags().BoolVar(&cfg.EnableProfiling, "enable-profiling",
true, "enable profiling via web interface host:port/debug/pprof")
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.Kubeconfig,
"kubeconfig", "", "Kubernetes configuration file (by default in-cluster
configuration will be used)")
cmd.PersistentFlags().DurationVar(&cfg.Kubernetes.ResyncInterval.Duration,
"resync-interval", time.Minute, "the controller resync (with Kubernetes)
interval, the minimum resync interval is 30s")
+ cmd.PersistentFlags().StringSliceVar(&cfg.Kubernetes.AppNamespaces,
"app-namespace", []string{config.NamespaceAll}, "namespaces that controller
will watch for resources")
cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url",
"", "the base URL for APISIX admin api / manager api")
cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey,
"apisix-admin-key", "", "admin key used for the authorization of APISIX admin
api / manager api")
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 4ae211d..5e34911 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -38,6 +38,8 @@ kubernetes:
resync_interval: "6h" # how long should apisix-ingress-controller
# re-synchronizes with Kubernetes, default is 6h,
# and the minimal resync interval is 30s.
+ app_namespaces: ["*"] # namespace list that controller will watch for
resources,
+ # by default all namespaces (represented by "*") are
watched.
# APISIX related configurations.
apisix:
diff --git a/pkg/config/config.go b/pkg/config/config.go
index 75d00a4..4a00383 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -22,11 +22,15 @@ import (
"time"
"gopkg.in/yaml.v2"
+ v1 "k8s.io/api/core/v1"
"github.com/api7/ingress-controller/pkg/types"
)
const (
+ // NamespaceAll represents all namespaces.
+ NamespaceAll = "*"
+
_minimalResyncInterval = 30 * time.Second
)
@@ -45,6 +49,7 @@ type Config struct {
type KubernetesConfig struct {
Kubeconfig string `json:"kubeconfig" yaml:"kubeconfig"`
ResyncInterval types.TimeDuration `json:"resync_interval"
yaml:"resync_interval"`
+ AppNamespaces []string `json:"app_namespaces"
yaml:"app_namespaces"`
}
// APISIXConfig contains all APISIX related config items.
@@ -64,7 +69,8 @@ func NewDefaultConfig() *Config {
EnableProfiling: true,
Kubernetes: KubernetesConfig{
Kubeconfig: "", // Use in-cluster configurations.
- ResyncInterval: types.TimeDuration{6 * time.Hour},
+ ResyncInterval: types.TimeDuration{Duration: 6 *
time.Hour},
+ AppNamespaces: []string{v1.NamespaceAll},
},
}
}
@@ -99,5 +105,21 @@ func (cfg *Config) Validate() error {
if cfg.APISIX.BaseURL == "" {
return errors.New("apisix base url is required")
}
+ cfg.Kubernetes.AppNamespaces =
purifyAppNamespaces(cfg.Kubernetes.AppNamespaces)
return nil
}
+
+func purifyAppNamespaces(namespaces []string) []string {
+ exists := make(map[string]struct{})
+ var ultimate []string
+ for _, ns := range namespaces {
+ if ns == NamespaceAll {
+ return []string{v1.NamespaceAll}
+ }
+ if _, ok := exists[ns]; !ok {
+ ultimate = append(ultimate, ns)
+ exists[ns] = struct{}{}
+ }
+ }
+ return ultimate
+}
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index 138e764..d220b38 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -35,6 +35,7 @@ func TestNewConfigFromFile(t *testing.T) {
Kubernetes: KubernetesConfig{
ResyncInterval: types.TimeDuration{time.Hour},
Kubeconfig: "/path/to/foo/baz",
+ AppNamespaces: []string{""},
},
APISIX: APISIXConfig{
BaseURL: "http://127.0.0.1:8080/apisix",
diff --git a/pkg/ingress/controller/apisix_route.go
b/pkg/ingress/controller/apisix_route.go
index 3e033b9..c9938e7 100644
--- a/pkg/ingress/controller/apisix_route.go
+++ b/pkg/ingress/controller/apisix_route.go
@@ -22,7 +22,7 @@ import (
clientSet
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
api6Scheme
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned/scheme"
api6Informers
"github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions/config/v1"
- "github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
+ v1 "github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@@ -37,6 +37,7 @@ import (
)
type ApisixRouteController struct {
+ controller *Controller
kubeclientset kubernetes.Interface
apisixRouteClientset clientSet.Interface
apisixRouteList v1.ApisixRouteLister
@@ -53,10 +54,12 @@ type RouteQueueObj struct {
func BuildApisixRouteController(
kubeclientset kubernetes.Interface,
api6RouteClientset clientSet.Interface,
- api6RouteInformer api6Informers.ApisixRouteInformer)
*ApisixRouteController {
+ api6RouteInformer api6Informers.ApisixRouteInformer,
+ root *Controller) *ApisixRouteController {
runtime.Must(api6Scheme.AddToScheme(scheme.Scheme))
controller := &ApisixRouteController{
+ controller: root,
kubeclientset: kubeclientset,
apisixRouteClientset: api6RouteClientset,
apisixRouteList: api6RouteInformer.Lister(),
@@ -79,6 +82,9 @@ func (c *ApisixRouteController) addFunc(obj interface{}) {
runtime.HandleError(err)
return
}
+ if !c.controller.namespaceWatching(key) {
+ return
+ }
rqo := &RouteQueueObj{Key: key, OldObj: nil, Ope: ADD}
c.workqueue.AddRateLimited(rqo)
}
@@ -96,6 +102,9 @@ func (c *ApisixRouteController) updateFunc(oldObj, newObj
interface{}) {
runtime.HandleError(err)
return
}
+ if !c.controller.namespaceWatching(key) {
+ return
+ }
rqo := &RouteQueueObj{Key: key, OldObj: oldRoute, Ope: UPDATE}
c.workqueue.AddRateLimited(rqo)
}
@@ -119,6 +128,9 @@ func (c *ApisixRouteController) deleteFunc(obj interface{})
{
runtime.HandleError(err)
return
}
+ if !c.controller.namespaceWatching(key) {
+ return
+ }
rqo := &RouteQueueObj{Key: key, OldObj: oldRoute, Ope: DELETE}
c.workqueue.AddRateLimited(rqo)
}
diff --git a/pkg/ingress/controller/apisix_service.go
b/pkg/ingress/controller/apisix_service.go
index a3b18af..50fec6c 100644
--- a/pkg/ingress/controller/apisix_service.go
+++ b/pkg/ingress/controller/apisix_service.go
@@ -22,7 +22,7 @@ import (
clientSet
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
apisixScheme
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned/scheme"
informers
"github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions/config/v1"
- "github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
+ v1 "github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@@ -37,6 +37,7 @@ import (
)
type ApisixServiceController struct {
+ controller *Controller
kubeclientset kubernetes.Interface
apisixClientset clientSet.Interface
apisixServiceList v1.ApisixServiceLister
@@ -47,10 +48,12 @@ type ApisixServiceController struct {
func BuildApisixServiceController(
kubeclientset kubernetes.Interface,
apisixServiceClientset clientSet.Interface,
- apisixServiceInformer informers.ApisixServiceInformer)
*ApisixServiceController {
+ apisixServiceInformer informers.ApisixServiceInformer,
+ root *Controller) *ApisixServiceController {
runtime.Must(apisixScheme.AddToScheme(scheme.Scheme))
controller := &ApisixServiceController{
+ controller: root,
kubeclientset: kubeclientset,
apisixClientset: apisixServiceClientset,
apisixServiceList: apisixServiceInformer.Lister(),
@@ -159,6 +162,9 @@ func (c *ApisixServiceController) addFunc(obj interface{}) {
runtime.HandleError(err)
return
}
+ if !c.controller.namespaceWatching(key) {
+ return
+ }
sqo := &ServiceQueueObj{Key: key, OldObj: nil, Ope: ADD}
c.workqueue.AddRateLimited(sqo)
}
@@ -175,6 +181,9 @@ func (c *ApisixServiceController) updateFunc(oldObj, newObj
interface{}) {
runtime.HandleError(err)
return
}
+ if !c.controller.namespaceWatching(key) {
+ return
+ }
sqo := &ServiceQueueObj{Key: key, OldObj: oldService, Ope: UPDATE}
c.workqueue.AddRateLimited(sqo)
}
@@ -199,6 +208,9 @@ func (c *ApisixServiceController) deleteFunc(obj
interface{}) {
runtime.HandleError(err)
return
}
+ if !c.controller.namespaceWatching(key) {
+ return
+ }
sqo := &ServiceQueueObj{Key: key, OldObj: oldService, Ope: DELETE}
c.workqueue.AddRateLimited(sqo)
}
diff --git a/pkg/ingress/controller/apisix_tls.go
b/pkg/ingress/controller/apisix_tls.go
index 0510d14..95f1fb4 100644
--- a/pkg/ingress/controller/apisix_tls.go
+++ b/pkg/ingress/controller/apisix_tls.go
@@ -22,7 +22,7 @@ import (
clientSet
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
apisixScheme
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned/scheme"
informers
"github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions/config/v1"
- "github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
+ v1 "github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@@ -37,6 +37,7 @@ import (
)
type ApisixTlsController struct {
+ controller *Controller
kubeclientset kubernetes.Interface
apisixClientset clientSet.Interface
apisixTlsList v1.ApisixTlsLister
@@ -53,10 +54,12 @@ type TlsQueueObj struct {
func BuildApisixTlsController(
kubeclientset kubernetes.Interface,
apisixTlsClientset clientSet.Interface,
- apisixTlsInformer informers.ApisixTlsInformer) *ApisixTlsController {
+ apisixTlsInformer informers.ApisixTlsInformer,
+ root *Controller) *ApisixTlsController {
runtime.Must(apisixScheme.AddToScheme(scheme.Scheme))
controller := &ApisixTlsController{
+ controller: root,
kubeclientset: kubeclientset,
apisixClientset: apisixTlsClientset,
apisixTlsList: apisixTlsInformer.Lister(),
@@ -161,6 +164,9 @@ func (c *ApisixTlsController) addFunc(obj interface{}) {
runtime.HandleError(err)
return
}
+ if !c.controller.namespaceWatching(key) {
+ return
+ }
rqo := &TlsQueueObj{Key: key, OldObj: nil, Ope: state.Create}
c.workqueue.AddRateLimited(rqo)
}
@@ -177,6 +183,9 @@ func (c *ApisixTlsController) updateFunc(oldObj, newObj
interface{}) {
runtime.HandleError(err)
return
}
+ if !c.controller.namespaceWatching(key) {
+ return
+ }
rqo := &TlsQueueObj{Key: key, OldObj: oldTls, Ope: state.Update}
c.workqueue.AddRateLimited(rqo)
}
@@ -200,6 +209,9 @@ func (c *ApisixTlsController) deleteFunc(obj interface{}) {
runtime.HandleError(err)
return
}
+ if !c.controller.namespaceWatching(key) {
+ return
+ }
rqo := &TlsQueueObj{Key: key, OldObj: oldTls, Ope: state.Delete}
c.workqueue.AddRateLimited(rqo)
}
diff --git a/pkg/ingress/controller/apisix_upstream.go
b/pkg/ingress/controller/apisix_upstream.go
index 71e1ca9..2e43fe9 100644
--- a/pkg/ingress/controller/apisix_upstream.go
+++ b/pkg/ingress/controller/apisix_upstream.go
@@ -22,7 +22,7 @@ import (
clientSet
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
apisixScheme
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned/scheme"
informers
"github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions/config/v1"
- "github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
+ v1 "github.com/gxthrj/apisix-ingress-types/pkg/client/listers/config/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
@@ -38,6 +38,7 @@ import (
)
type ApisixUpstreamController struct {
+ controller *Controller
kubeclientset kubernetes.Interface
apisixClientset clientSet.Interface
apisixUpstreamList v1.ApisixUpstreamLister
@@ -48,10 +49,12 @@ type ApisixUpstreamController struct {
func BuildApisixUpstreamController(
kubeclientset kubernetes.Interface,
apisixUpstreamClientset clientSet.Interface,
- apisixUpstreamInformer informers.ApisixUpstreamInformer)
*ApisixUpstreamController {
+ apisixUpstreamInformer informers.ApisixUpstreamInformer,
+ root *Controller) *ApisixUpstreamController {
runtime.Must(apisixScheme.AddToScheme(scheme.Scheme))
controller := &ApisixUpstreamController{
+ controller: root,
kubeclientset: kubeclientset,
apisixClientset: apisixUpstreamClientset,
apisixUpstreamList: apisixUpstreamInformer.Lister(),
@@ -161,6 +164,9 @@ func (c *ApisixUpstreamController) addFunc(obj interface{})
{
runtime.HandleError(err)
return
}
+ if !c.controller.namespaceWatching(key) {
+ return
+ }
sqo := &UpstreamQueueObj{Key: key, OldObj: nil, Ope: ADD}
c.workqueue.AddRateLimited(sqo)
}
@@ -202,6 +208,9 @@ func (c *ApisixUpstreamController) deleteFunc(obj
interface{}) {
runtime.HandleError(err)
return
}
+ if !c.controller.namespaceWatching(key) {
+ return
+ }
sqo := &UpstreamQueueObj{Key: key, OldObj: oldUpstream, Ope: DELETE}
c.workqueue.AddRateLimited(sqo)
}
diff --git a/pkg/ingress/controller/controller.go
b/pkg/ingress/controller/controller.go
index 7cf38ed..2114ec2 100644
--- a/pkg/ingress/controller/controller.go
+++ b/pkg/ingress/controller/controller.go
@@ -18,6 +18,10 @@ import (
"os"
"sync"
+ v1 "k8s.io/api/core/v1"
+
+ "k8s.io/client-go/tools/cache"
+
"github.com/api7/ingress-controller/pkg/apisix"
clientSet
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
@@ -44,6 +48,7 @@ func recoverException() {
// Controller is the ingress apisix controller object.
type Controller struct {
wg sync.WaitGroup
+ watchingNamespace map[string]struct{}
apiServer *api.Server
clientset kubernetes.Interface
crdClientset crdclientset.Interface
@@ -82,12 +87,21 @@ func NewController(cfg *config.Config) (*Controller, error)
{
crdClientset := kube.GetApisixClient()
sharedInformerFactory :=
externalversions.NewSharedInformerFactory(crdClientset,
cfg.Kubernetes.ResyncInterval.Duration)
+ var watchingNamespace map[string]struct{}
+ if len(cfg.Kubernetes.AppNamespaces) > 1 ||
cfg.Kubernetes.AppNamespaces[0] != v1.NamespaceAll {
+ watchingNamespace = make(map[string]struct{},
len(cfg.Kubernetes.AppNamespaces))
+ for _, ns := range cfg.Kubernetes.AppNamespaces {
+ watchingNamespace[ns] = struct{}{}
+ }
+ }
+
c := &Controller{
apiServer: apiSrv,
metricsCollector: metrics.NewPrometheusCollector(podName,
podNamespace),
clientset: kube.GetKubeClient(),
crdClientset: crdClientset,
crdInformerFactory: sharedInformerFactory,
+ watchingNamespace: watchingNamespace,
}
return c, nil
@@ -117,7 +131,7 @@ func (c *Controller) Run(stop chan struct{}) error {
epInformer := ac.CoreSharedInformerFactory.Core().V1().Endpoints()
kube.EndpointsInformer = epInformer
// endpoint
- ac.Endpoint()
+ ac.Endpoint(c)
c.goAttach(func() {
ac.CoreSharedInformerFactory.Start(stop)
})
@@ -128,13 +142,13 @@ func (c *Controller) Run(stop chan struct{}) error {
})
// ApisixRoute
- ac.ApisixRoute()
+ ac.ApisixRoute(c)
// ApisixUpstream
- ac.ApisixUpstream()
+ ac.ApisixUpstream(c)
// ApisixService
- ac.ApisixService()
+ ac.ApisixService(c)
// ApisixTLS
- ac.ApisixTLS()
+ ac.ApisixTLS(c)
c.goAttach(func() {
ac.SharedInformerFactory.Start(stop)
@@ -145,6 +159,24 @@ func (c *Controller) Run(stop chan struct{}) error {
return nil
}
+// namespaceWatching accepts a resource key, getting the namespace part
+// and checking whether the namespace is being watched.
+func (c *Controller) namespaceWatching(key string) (ok bool) {
+ if c.watchingNamespace == nil {
+ ok = true
+ return
+ }
+ ns, _, err := cache.SplitMetaNamespaceKey(key)
+ if err != nil {
+ // Ignore resource with invalid key.
+ ok = false
+ log.Warnf("resource %s was ignored since: %s", key, err)
+ return
+ }
+ _, ok = c.watchingNamespace[ns]
+ return
+}
+
type Api6Controller struct {
KubeClientSet kubernetes.Interface
Api6ClientSet clientSet.Interface
@@ -153,40 +185,44 @@ type Api6Controller struct {
Stop chan struct{}
}
-func (api6 *Api6Controller) ApisixRoute() {
+func (api6 *Api6Controller) ApisixRoute(controller *Controller) {
arc := BuildApisixRouteController(
api6.KubeClientSet,
api6.Api6ClientSet,
- api6.SharedInformerFactory.Apisix().V1().ApisixRoutes())
+ api6.SharedInformerFactory.Apisix().V1().ApisixRoutes(),
+ controller)
arc.Run(api6.Stop)
}
-func (api6 *Api6Controller) ApisixUpstream() {
+func (api6 *Api6Controller) ApisixUpstream(controller *Controller) {
auc := BuildApisixUpstreamController(
api6.KubeClientSet,
api6.Api6ClientSet,
- api6.SharedInformerFactory.Apisix().V1().ApisixUpstreams())
+ api6.SharedInformerFactory.Apisix().V1().ApisixUpstreams(),
+ controller)
auc.Run(api6.Stop)
}
-func (api6 *Api6Controller) ApisixService() {
+func (api6 *Api6Controller) ApisixService(controller *Controller) {
auc := BuildApisixServiceController(
api6.KubeClientSet,
api6.Api6ClientSet,
- api6.SharedInformerFactory.Apisix().V1().ApisixServices())
+ api6.SharedInformerFactory.Apisix().V1().ApisixServices(),
+ controller)
auc.Run(api6.Stop)
}
-func (api6 *Api6Controller) ApisixTLS() {
+func (api6 *Api6Controller) ApisixTLS(controller *Controller) {
auc := BuildApisixTlsController(
api6.KubeClientSet,
api6.Api6ClientSet,
- api6.SharedInformerFactory.Apisix().V1().ApisixTlses())
+ api6.SharedInformerFactory.Apisix().V1().ApisixTlses(),
+ controller)
auc.Run(api6.Stop)
}
-func (api6 *Api6Controller) Endpoint() {
- auc := BuildEndpointController(api6.KubeClientSet)
+func (api6 *Api6Controller) Endpoint(controller *Controller) {
+ auc := BuildEndpointController(api6.KubeClientSet, controller)
//conf.EndpointsInformer)
auc.Run(api6.Stop)
}
diff --git a/pkg/ingress/controller/endpoint.go
b/pkg/ingress/controller/endpoint.go
index 396e2a4..6c7a7e1 100644
--- a/pkg/ingress/controller/endpoint.go
+++ b/pkg/ingress/controller/endpoint.go
@@ -38,14 +38,16 @@ import (
)
type EndpointController struct {
+ controller *Controller
kubeclientset kubernetes.Interface
endpointList CoreListerV1.EndpointsLister
endpointSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
}
-func BuildEndpointController(kubeclientset kubernetes.Interface)
*EndpointController {
+func BuildEndpointController(kubeclientset kubernetes.Interface, root
*Controller) *EndpointController {
controller := &EndpointController{
+ controller: root,
kubeclientset: kubeclientset,
endpointList: kube.EndpointsInformer.Lister(),
endpointSynced: kube.EndpointsInformer.Informer().HasSynced,
@@ -215,6 +217,9 @@ func (c *EndpointController) addFunc(obj interface{}) {
runtime.HandleError(err)
return
}
+ if !c.controller.namespaceWatching(key) {
+ return
+ }
c.workqueue.AddRateLimited(key)
}
@@ -235,5 +240,8 @@ func (c *EndpointController) deleteFunc(obj interface{}) {
runtime.HandleError(err)
return
}
+ if !c.controller.namespaceWatching(key) {
+ return
+ }
c.workqueue.AddRateLimited(key)
}
diff --git a/test/e2e/ingress/namespace.go b/test/e2e/ingress/namespace.go
new file mode 100644
index 0000000..23601ff
--- /dev/null
+++ b/test/e2e/ingress/namespace.go
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package ingress
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+
+ "github.com/api7/ingress-controller/test/e2e/scaffold"
+ "github.com/onsi/ginkgo"
+ "github.com/stretchr/testify/assert"
+)
+
+var _ = ginkgo.Describe("namespacing filtering", func() {
+ s := scaffold.NewDefaultScaffold()
+ ginkgo.It("resources in other namespaces should be ignored", func() {
+ backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+ route := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixRoute
+metadata:
+ name: httpbin-route
+spec:
+ rules:
+ - host: httpbin.com
+ http:
+ paths:
+ - backend:
+ serviceName: %s
+ servicePort: %d
+ path: /ip
+`, backendSvc, backendSvcPort[0])
+
+ assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(route),
"creating ApisixRoute")
+ assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(1),
"checking number of routes")
+ assert.Nil(ginkgo.GinkgoT(),
s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams")
+
+ body := s.NewAPISIXClient().GET("/ip").WithHeader("Host",
"httpbin.com").Expect().Status(http.StatusOK).Body().Raw()
+ var placeholder ip
+ err := json.Unmarshal([]byte(body), &placeholder)
+ assert.Nil(ginkgo.GinkgoT(), err, "unmarshalling IP")
+
+ // Now create another ApisixRoute in default namespace.
+ route = fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixRoute
+metadata:
+ name: httpbin-route
+spec:
+ rules:
+ - host: httpbin.com
+ http:
+ paths:
+ - backend:
+ serviceName: %s
+ servicePort: %d
+ path: /headers
+`, backendSvc, backendSvcPort[0])
+
+ assert.Nil(ginkgo.GinkgoT(),
s.CreateResourceFromStringWithNamespace(route, "default"), "creating
ApisixRoute")
+ _ = s.NewAPISIXClient().GET("/headers").WithHeader("Host",
"httpbin.com").Expect().Status(http.StatusNotFound)
+ })
+})
diff --git a/test/e2e/ingress/resourcepushing.go
b/test/e2e/ingress/resourcepushing.go
index 572e342..2ffba3f 100644
--- a/test/e2e/ingress/resourcepushing.go
+++ b/test/e2e/ingress/resourcepushing.go
@@ -15,6 +15,7 @@
package ingress
import (
+ "fmt"
"time"
"github.com/onsi/ginkgo"
@@ -26,7 +27,8 @@ import (
var _ = ginkgo.Describe("ApisixRoute Testing", func() {
s := scaffold.NewDefaultScaffold()
ginkgo.It("create and then scale upstream pods to 2 ", func() {
- apisixRoute := `
+ backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+ apisixRoute := fmt.Sprintf(`
apiVersion: apisix.apache.org/v1
kind: ApisixRoute
metadata:
@@ -37,11 +39,11 @@ spec:
http:
paths:
- backend:
- serviceName: httpbin-service-e2e-test
- servicePort: 80
+ serviceName: %s
+ servicePort: %d
path: /ip
-`
- s.CreateApisixRouteByString(apisixRoute)
+`, backendSvc, backendSvcPort[0])
+ assert.Nil(ginkgo.GinkgoT(),
s.CreateResourceFromString(apisixRoute))
err := s.EnsureNumApisixRoutesCreated(1)
assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
@@ -50,14 +52,15 @@ spec:
scale := 2
err = s.ScaleHTTPBIN(scale)
assert.Nil(ginkgo.GinkgoT(), err)
- time.Sleep(10 * time.Second) // wait for ingress to sync
+ time.Sleep(5 * time.Second) // wait for ingress to sync
ups, err := s.ListApisixUpstreams()
assert.Nil(ginkgo.GinkgoT(), err, "list upstreams error")
assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 2, "upstreams nodes
not expect")
})
- ginkgo.It("create and then remove ", func() {
- apisixRoute := `
+ ginkgo.It("create and then remove", func() {
+ backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+ apisixRoute := fmt.Sprintf(`
apiVersion: apisix.apache.org/v1
kind: ApisixRoute
metadata:
@@ -68,25 +71,22 @@ spec:
http:
paths:
- backend:
- serviceName: httpbin-service-e2e-test
- servicePort: 80
+ serviceName: %s
+ servicePort: %d
path: /ip
-`
- s.CreateApisixRouteByString(apisixRoute)
+`, backendSvc, backendSvcPort[0])
+ assert.Nil(ginkgo.GinkgoT(),
s.CreateResourceFromString(apisixRoute), "creating ApisixRoute")
err := s.EnsureNumApisixRoutesCreated(1)
assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
err = s.EnsureNumApisixUpstreamsCreated(1)
assert.Nil(ginkgo.GinkgoT(), err, "Checking number of
upstreams")
- ups, err := s.ListApisixUpstreams()
- assert.Nil(ginkgo.GinkgoT(), err, "list upstreams error")
- assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 1, "upstreams nodes
not expect")
// remove
- s.CreateApisixRouteByString(apisixRoute)
- time.Sleep(5 * time.Second) // wait for ingress to sync
- ups, err = s.ListApisixUpstreams()
+ assert.Nil(ginkgo.GinkgoT(),
s.RemoveResourceByString(apisixRoute))
+ time.Sleep(10 * time.Second) // wait for ingress to sync
+ ups, err := s.ListApisixUpstreams()
assert.Nil(ginkgo.GinkgoT(), err, "list upstreams error")
- assert.Len(ginkgo.GinkgoT(), len(ups), 0, "upstreams nodes not
expect")
+ assert.Len(ginkgo.GinkgoT(), ups, 0, "upstreams nodes not
expect")
})
})
diff --git a/test/e2e/scaffold/crd.go b/test/e2e/scaffold/crd.go
index 29338ae..b955270 100644
--- a/test/e2e/scaffold/crd.go
+++ b/test/e2e/scaffold/crd.go
@@ -91,12 +91,33 @@ func (s *Scaffold) CreateApisixRoute(name string, rules
[]ApisixRouteRule) {
k8s.KubectlApplyFromString(s.t, s.kubectlOptions, string(data))
}
-func (s *Scaffold) CreateApisixRouteByString(yaml string) {
- k8s.KubectlApplyFromString(s.t, s.kubectlOptions, yaml)
+// CreateResourceFromString creates resource from a loaded yaml string.
+func (s *Scaffold) CreateResourceFromString(yaml string) error {
+ return k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, yaml)
+}
+// RemoveResourceByString remove resource from a loaded yaml string.
+func (s *Scaffold) RemoveResourceByString(yaml string) error {
+ return k8s.KubectlDeleteFromStringE(s.t, s.kubectlOptions, yaml)
}
-func (s *Scaffold) RemoveApisixRouteByString(yaml string) {
- k8s.KubectlDeleteFromString(s.t, s.kubectlOptions, yaml)
+
+// CreateResourceFromStringWithNamespace creates resource from a loaded yaml
string
+// and sets its namespace to the sepcified one.
+func (s *Scaffold) CreateResourceFromStringWithNamespace(yaml, namespace
string) error {
+ originalNamespace := s.kubectlOptions.Namespace
+ s.kubectlOptions.Namespace = namespace
+ defer func() {
+ s.kubectlOptions.Namespace = originalNamespace
+ }()
+ s.addFinializer(func() {
+ originalNamespace := s.kubectlOptions.Namespace
+ s.kubectlOptions.Namespace = namespace
+ defer func() {
+ s.kubectlOptions.Namespace = originalNamespace
+ }()
+ assert.Nil(s.t, k8s.KubectlDeleteFromStringE(s.t,
s.kubectlOptions, yaml))
+ })
+ return k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, yaml)
}
func ensureNumApisixCRDsCreated(url string, desired int) error {
diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go
index 361908b..9a6de56 100644
--- a/test/e2e/scaffold/ingress.go
+++ b/test/e2e/scaffold/ingress.go
@@ -29,7 +29,7 @@ const (
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
- name: ingress-apisix-e2e-test-clusterrolebinding
+ name: %s-clusterrolebinding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
@@ -39,7 +39,7 @@ subjects:
name: ingress-apisix-e2e-test-service-account
namespace: %s
`
- _ingressAPISIXDeployment = `
+ _ingressAPISIXDeploymentTemplate = `
apiVersion: apps/v1
kind: Deployment
metadata:
@@ -51,7 +51,7 @@ spec:
app: ingress-apisix-controller-deployment-e2e-test
strategy:
rollingUpdate:
- maxSurge: 50%
+ maxSurge: 50%%
maxUnavailable: 1
type: RollingUpdate
template:
@@ -78,7 +78,7 @@ spec:
port: 8080
timeoutSeconds: 2
image: "apache/apisix-ingress-controller:dev"
- imagePullPolicy: IfNotPresent
+ imagePullPolicy: Never
name: ingress-apisix-controller-deployment-e2e-test
ports:
- containerPort: 8080
@@ -95,20 +95,23 @@ spec:
- :8080
- --apisix-base-url
- http://apisix-service-e2e-test:9180/apisix/admin
+ - --app-namespace
+ - %s
serviceAccount: ingress-apisix-e2e-test-service-account
`
)
func (s *Scaffold) newIngressAPISIXController() error {
+ ingressAPISIXDeployment :=
fmt.Sprintf(_ingressAPISIXDeploymentTemplate, s.namespace)
if err := k8s.CreateServiceAccountE(s.t, s.kubectlOptions,
_serviceAccount); err != nil {
return err
}
- crb := fmt.Sprintf(_clusterRoleBinding, s.namespace)
+ crb := fmt.Sprintf(_clusterRoleBinding, s.namespace, s.namespace)
if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, crb); err
!= nil {
return err
}
- if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions,
_ingressAPISIXDeployment); err != nil {
+ if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions,
ingressAPISIXDeployment); err != nil {
return err
}
return nil
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index 6a5a711..fdd1998 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -53,6 +53,7 @@ type Scaffold struct {
apisixService *corev1.Service
httpbinDeployment *appsv1.Deployment
httpbinService *corev1.Service
+ finializers []func()
// Used for template rendering.
EtcdServiceFQDN string
@@ -133,14 +134,6 @@ func (s *Scaffold) NewAPISIXClient() *httpexpect.Expect {
})
}
-func (s *Scaffold) BeforeEach() {
- s.beforeEach()
-}
-
-func (s *Scaffold) AfterEach() {
- s.afterEach()
-}
-
func (s *Scaffold) beforeEach() {
var err error
s.namespace = fmt.Sprintf("ingress-apisix-e2e-tests-%s-%d",
s.opts.Name, time.Now().Nanosecond())
@@ -157,7 +150,6 @@ func (s *Scaffold) beforeEach() {
assert.Nil(s.t, err, "initializing etcd")
// We don't use k8s.WaitUntilServiceAvailable since it hacks for
Minikube.
- //err = s.waitAllEtcdPodsAvailable()
err = k8s.WaitUntilNumPodsCreatedE(s.t, s.kubectlOptions,
s.labelSelector("app=etcd-deployment-e2e-test"), 1, 5, 2*time.Second)
assert.Nil(s.t, err, "waiting for etcd ready")
@@ -182,8 +174,16 @@ func (s *Scaffold) beforeEach() {
func (s *Scaffold) afterEach() {
defer ginkgo.GinkgoRecover()
- //err := k8s.DeleteNamespaceE(s.t, s.kubectlOptions, s.namespace)
- //assert.Nilf(ginkgo.GinkgoT(), err, "deleting namespace %s",
s.namespace)
+ err := k8s.DeleteNamespaceE(s.t, s.kubectlOptions, s.namespace)
+ assert.Nilf(ginkgo.GinkgoT(), err, "deleting namespace %s", s.namespace)
+
+ for _, f := range s.finializers {
+ f()
+ }
+}
+
+func (s *Scaffold) addFinializer(f func()) {
+ s.finializers = append(s.finializers, f)
}
func (s *Scaffold) renderConfig(path string) (string, error) {