This is an automated email from the ASF dual-hosted git repository.
zhangjintao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 6394cdd1 feat: sync CRD and ingress resource to APISIX mechanism.
(#1022)
6394cdd1 is described below
commit 6394cdd11e96f205756bb8c19eaa047bdc114774
Author: Xin Rong <[email protected]>
AuthorDate: Wed Jun 22 23:29:29 2022 +0800
feat: sync CRD and ingress resource to APISIX mechanism. (#1022)
---
cmd/ingress/ingress.go | 1 +
conf/config-default.yaml | 2 +-
pkg/config/config.go | 42 ++---
pkg/config/config_test.go | 41 +++--
pkg/ingress/apisix_cluster_config.go | 26 +++
pkg/ingress/apisix_consumer.go | 26 +++
pkg/ingress/apisix_pluginconfig.go | 22 +++
pkg/ingress/apisix_route.go | 22 +++
pkg/ingress/apisix_tls.go | 26 +++
pkg/ingress/apisix_upstream.go | 18 ++
pkg/ingress/controller.go | 60 ++++++-
pkg/ingress/ingress.go | 22 +++
samples/deploy/configmap/apisix-ingress-cm.yaml | 2 +-
test/e2e/scaffold/ingress.go | 11 +-
test/e2e/scaffold/k8s.go | 22 ++-
test/e2e/scaffold/scaffold.go | 4 +
test/e2e/suite-ingress/resourcesync.go | 227 ++++++++++++++++++++++++
17 files changed, 528 insertions(+), 46 deletions(-)
diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go
index 324ff7b1..33005e32 100644
--- a/cmd/ingress/ingress.go
+++ b/cmd/ingress/ingress.go
@@ -171,6 +171,7 @@ For example, no available LB exists in the bare metal
environment.`)
cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterBaseURL,
"default-apisix-cluster-base-url", "", "the base URL of admin api / manager api
for the default APISIX cluster")
cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterAdminKey,
"default-apisix-cluster-admin-key", "", "admin key used for the authorization
of admin api / manager api for the default APISIX cluster")
cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterName,
"default-apisix-cluster-name", "default", "name of the default apisix cluster")
+
cmd.PersistentFlags().DurationVar(&cfg.ApisixResourceSyncInterval.Duration,
"apisix-resource-sync-interval", 300*time.Second, "interval between syncs in
seconds. Default value is 300s.")
if err := cmd.PersistentFlags().MarkDeprecated("app-namespace", "use
namespace-selector instead"); err != nil {
dief("failed to mark `app-namespace` as deprecated: %s", err)
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 5e05390a..55327366 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -44,7 +44,7 @@ ingress_status_address: [] # when there is no available
information on the Ser
# For example, no available LB exists in the bare
metal environment.
enable_profiling: true # enable profiling via web interfaces
# host:port/debug/pprof, default is true.
-
+apisix-resource-sync-interval: "300s" # Default interval for synchronizing
Kubernetes resources to APISIX
# Kubernetes related configurations.
kubernetes:
kubeconfig: "" # the Kubernetes configuration file
path, default is
diff --git a/pkg/config/config.go b/pkg/config/config.go
index af6d3665..d5f6f350 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -72,17 +72,18 @@ const (
// Config contains all config items which are necessary for
// apisix-ingress-controller's running.
type Config struct {
- CertFilePath string `json:"cert_file"
yaml:"cert_file"`
- KeyFilePath string `json:"key_file" yaml:"key_file"`
- LogLevel string `json:"log_level"
yaml:"log_level"`
- LogOutput string `json:"log_output"
yaml:"log_output"`
- HTTPListen string `json:"http_listen"
yaml:"http_listen"`
- HTTPSListen string `json:"https_listen"
yaml:"https_listen"`
- IngressPublishService string `json:"ingress_publish_service"
yaml:"ingress_publish_service"`
- IngressStatusAddress []string `json:"ingress_status_address"
yaml:"ingress_status_address"`
- EnableProfiling bool `json:"enable_profiling"
yaml:"enable_profiling"`
- Kubernetes KubernetesConfig `json:"kubernetes"
yaml:"kubernetes"`
- APISIX APISIXConfig `json:"apisix" yaml:"apisix"`
+ CertFilePath string `json:"cert_file"
yaml:"cert_file"`
+ KeyFilePath string `json:"key_file"
yaml:"key_file"`
+ LogLevel string `json:"log_level"
yaml:"log_level"`
+ LogOutput string `json:"log_output"
yaml:"log_output"`
+ HTTPListen string `json:"http_listen"
yaml:"http_listen"`
+ HTTPSListen string `json:"https_listen"
yaml:"https_listen"`
+ IngressPublishService string
`json:"ingress_publish_service" yaml:"ingress_publish_service"`
+ IngressStatusAddress []string
`json:"ingress_status_address" yaml:"ingress_status_address"`
+ EnableProfiling bool `json:"enable_profiling"
yaml:"enable_profiling"`
+ Kubernetes KubernetesConfig `json:"kubernetes"
yaml:"kubernetes"`
+ APISIX APISIXConfig `json:"apisix"
yaml:"apisix"`
+ ApisixResourceSyncInterval types.TimeDuration
`json:"apisix-resource-sync-interval" yaml:"apisix-resource-sync-interval"`
}
// KubernetesConfig contains all Kubernetes related config items.
@@ -118,15 +119,16 @@ type APISIXConfig struct {
// default value.
func NewDefaultConfig() *Config {
return &Config{
- LogLevel: "warn",
- LogOutput: "stderr",
- HTTPListen: ":8080",
- HTTPSListen: ":8443",
- IngressPublishService: "",
- IngressStatusAddress: []string{},
- CertFilePath: "/etc/webhook/certs/cert.pem",
- KeyFilePath: "/etc/webhook/certs/key.pem",
- EnableProfiling: true,
+ LogLevel: "warn",
+ LogOutput: "stderr",
+ HTTPListen: ":8080",
+ HTTPSListen: ":8443",
+ IngressPublishService: "",
+ IngressStatusAddress: []string{},
+ CertFilePath: "/etc/webhook/certs/cert.pem",
+ KeyFilePath: "/etc/webhook/certs/key.pem",
+ EnableProfiling: true,
+ ApisixResourceSyncInterval: types.TimeDuration{Duration: 300 *
time.Second},
Kubernetes: KubernetesConfig{
Kubeconfig: "", // Use in-cluster
configurations.
ResyncInterval:
types.TimeDuration{Duration: 6 * time.Hour},
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index aa0a3077..cdca004a 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -28,15 +28,16 @@ import (
func TestNewConfigFromFile(t *testing.T) {
cfg := &Config{
- LogLevel: "warn",
- LogOutput: "stdout",
- HTTPListen: ":9090",
- HTTPSListen: ":9443",
- IngressPublishService: "",
- IngressStatusAddress: []string{},
- CertFilePath: "/etc/webhook/certs/cert.pem",
- KeyFilePath: "/etc/webhook/certs/key.pem",
- EnableProfiling: true,
+ LogLevel: "warn",
+ LogOutput: "stdout",
+ HTTPListen: ":9090",
+ HTTPSListen: ":9443",
+ IngressPublishService: "",
+ IngressStatusAddress: []string{},
+ CertFilePath: "/etc/webhook/certs/cert.pem",
+ KeyFilePath: "/etc/webhook/certs/key.pem",
+ EnableProfiling: true,
+ ApisixResourceSyncInterval: types.TimeDuration{Duration: 200 *
time.Second},
Kubernetes: KubernetesConfig{
ResyncInterval:
types.TimeDuration{Duration: time.Hour},
Kubeconfig: "/path/to/foo/baz",
@@ -86,6 +87,7 @@ https_listen: :9443
ingress_publish_service: ""
ingress_status_address: []
enable_profiling: true
+apisix-resource-sync-interval: 200s
kubernetes:
kubeconfig: /path/to/foo/baz
resync_interval: 1h0m0s
@@ -113,15 +115,16 @@ apisix:
func TestConfigWithEnvVar(t *testing.T) {
cfg := &Config{
- LogLevel: "warn",
- LogOutput: "stdout",
- HTTPListen: ":9090",
- HTTPSListen: ":9443",
- IngressPublishService: "",
- IngressStatusAddress: []string{},
- CertFilePath: "/etc/webhook/certs/cert.pem",
- KeyFilePath: "/etc/webhook/certs/key.pem",
- EnableProfiling: true,
+ LogLevel: "warn",
+ LogOutput: "stdout",
+ HTTPListen: ":9090",
+ HTTPSListen: ":9443",
+ IngressPublishService: "",
+ IngressStatusAddress: []string{},
+ CertFilePath: "/etc/webhook/certs/cert.pem",
+ KeyFilePath: "/etc/webhook/certs/key.pem",
+ EnableProfiling: true,
+ ApisixResourceSyncInterval: types.TimeDuration{Duration: 200 *
time.Second},
Kubernetes: KubernetesConfig{
ResyncInterval:
types.TimeDuration{Duration: time.Hour},
Kubeconfig: "",
@@ -160,6 +163,7 @@ func TestConfigWithEnvVar(t *testing.T) {
"ingress_publish_service": "",
"ingress_status_address": [],
"enable_profiling": true,
+ "apisix-resource-sync-interval": "200s",
"kubernetes": {
"kubeconfig": "{{.KUBECONFIG}}",
"resync_interval": "1h0m0s",
@@ -195,6 +199,7 @@ https_listen: :9443
ingress_publish_service: ""
ingress_status_address: []
enable_profiling: true
+apisix-resource-sync-interval: 200s
kubernetes:
resync_interval: 1h0m0s
kubeconfig: "{{.KUBECONFIG}}"
diff --git a/pkg/ingress/apisix_cluster_config.go
b/pkg/ingress/apisix_cluster_config.go
index ecc33d0e..fa346381 100644
--- a/pkg/ingress/apisix_cluster_config.go
+++ b/pkg/ingress/apisix_cluster_config.go
@@ -403,3 +403,29 @@ func (c *apisixClusterConfigController) onDelete(obj
interface{}) {
c.controller.MetricsCollector.IncrEvents("clusterConfig", "delete")
}
+
+func (c *apisixClusterConfigController) ResourceSync() {
+ objs := c.controller.apisixClusterConfigInformer.GetIndexer().List()
+ for _, obj := range objs {
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err != nil {
+ log.Errorw("ApisixClusterConfig sync failed, found
ApisixClusterConfig resource with bad meta namespace key", zap.String("error",
err.Error()))
+ continue
+ }
+ if !c.controller.isWatchingNamespace(key) {
+ continue
+ }
+ acc, err := kube.NewApisixClusterConfig(obj)
+ if err != nil {
+ log.Errorw("found ApisixClusterConfig resource with bad
type", zap.String("error", err.Error()))
+ return
+ }
+ c.workqueue.Add(&types.Event{
+ Type: types.EventAdd,
+ Object: kube.ApisixClusterConfigEvent{
+ Key: key,
+ GroupVersion: acc.GroupVersion(),
+ },
+ })
+ }
+}
diff --git a/pkg/ingress/apisix_consumer.go b/pkg/ingress/apisix_consumer.go
index 8a456a70..581239dd 100644
--- a/pkg/ingress/apisix_consumer.go
+++ b/pkg/ingress/apisix_consumer.go
@@ -318,3 +318,29 @@ func (c *apisixConsumerController) onDelete(obj
interface{}) {
c.controller.MetricsCollector.IncrEvents("consumer", "delete")
}
+
+func (c *apisixConsumerController) ResourceSync() {
+ objs := c.controller.apisixConsumerInformer.GetIndexer().List()
+ for _, obj := range objs {
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err != nil {
+ log.Errorw("ApisixConsumer sync failed, found
ApisixConsumer resource with bad meta namespace key", zap.String("error",
err.Error()))
+ continue
+ }
+ if !c.controller.isWatchingNamespace(key) {
+ continue
+ }
+ ac, err := kube.NewApisixConsumer(obj)
+ if err != nil {
+ log.Errorw("found ApisixConsumer resource with bad
type", zap.String("error", err.Error()))
+ return
+ }
+ c.workqueue.Add(&types.Event{
+ Type: types.EventAdd,
+ Object: kube.ApisixConsumerEvent{
+ Key: key,
+ GroupVersion: ac.GroupVersion(),
+ },
+ })
+ }
+}
diff --git a/pkg/ingress/apisix_pluginconfig.go
b/pkg/ingress/apisix_pluginconfig.go
index 3faf885b..fa7e64a7 100644
--- a/pkg/ingress/apisix_pluginconfig.go
+++ b/pkg/ingress/apisix_pluginconfig.go
@@ -367,3 +367,25 @@ func (c *apisixPluginConfigController) onDelete(obj
interface{}) {
c.controller.MetricsCollector.IncrEvents("PluginConfig", "delete")
}
+
+func (c *apisixPluginConfigController) ResourceSync() {
+ objs := c.controller.apisixPluginConfigInformer.GetIndexer().List()
+ for _, obj := range objs {
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err != nil {
+ log.Errorw("ApisixPluginConfig sync failed, found
ApisixPluginConfig resource with bad meta namespace key", zap.String("error",
err.Error()))
+ continue
+ }
+ if !c.controller.isWatchingNamespace(key) {
+ continue
+ }
+ apc := kube.MustNewApisixPluginConfig(obj)
+ c.workqueue.Add(&types.Event{
+ Type: types.EventAdd,
+ Object: kube.ApisixPluginConfigEvent{
+ Key: key,
+ GroupVersion: apc.GroupVersion(),
+ },
+ })
+ }
+}
diff --git a/pkg/ingress/apisix_route.go b/pkg/ingress/apisix_route.go
index c2ecee33..a2f95374 100644
--- a/pkg/ingress/apisix_route.go
+++ b/pkg/ingress/apisix_route.go
@@ -449,3 +449,25 @@ func (c *apisixRouteController) onDelete(obj interface{}) {
c.controller.MetricsCollector.IncrEvents("route", "delete")
}
+
+func (c *apisixRouteController) ResourceSync() {
+ objs := c.controller.apisixRouteInformer.GetIndexer().List()
+ for _, obj := range objs {
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err != nil {
+ log.Errorw("ApisixRoute sync failed, found ApisixRoute
resource with bad meta namespace key", zap.String("error", err.Error()))
+ continue
+ }
+ if !c.controller.isWatchingNamespace(key) {
+ continue
+ }
+ ar := kube.MustNewApisixRoute(obj)
+ c.workqueue.Add(&types.Event{
+ Type: types.EventAdd,
+ Object: kube.ApisixRouteEvent{
+ Key: key,
+ GroupVersion: ar.GroupVersion(),
+ },
+ })
+ }
+}
diff --git a/pkg/ingress/apisix_tls.go b/pkg/ingress/apisix_tls.go
index 66bc8442..be484edc 100644
--- a/pkg/ingress/apisix_tls.go
+++ b/pkg/ingress/apisix_tls.go
@@ -359,3 +359,29 @@ func (c *apisixTlsController) onDelete(obj interface{}) {
c.controller.MetricsCollector.IncrEvents("TLS", "delete")
}
+
+func (c *apisixTlsController) ResourceSync() {
+ objs := c.controller.apisixTlsInformer.GetIndexer().List()
+ for _, obj := range objs {
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err != nil {
+ log.Errorw("ApisixTls sync failed, found ApisixTls
object with bad namespace/name ignore it", zap.String("error", err.Error()))
+ continue
+ }
+ if !c.controller.isWatchingNamespace(key) {
+ continue
+ }
+ tls, err := kube.NewApisixTls(obj)
+ if err != nil {
+ log.Errorw("ApisixTls sync failed, found ApisixTls
resource with bad type", zap.Error(err))
+ continue
+ }
+ c.workqueue.Add(&types.Event{
+ Type: types.EventAdd,
+ Object: kube.ApisixTlsEvent{
+ Key: key,
+ GroupVersion: tls.GroupVersion(),
+ },
+ })
+ }
+}
diff --git a/pkg/ingress/apisix_upstream.go b/pkg/ingress/apisix_upstream.go
index ca882970..a82f5df8 100644
--- a/pkg/ingress/apisix_upstream.go
+++ b/pkg/ingress/apisix_upstream.go
@@ -301,3 +301,21 @@ func (c *apisixUpstreamController) onDelete(obj
interface{}) {
c.controller.MetricsCollector.IncrEvents("upstream", "delete")
}
+
+func (c *apisixUpstreamController) ResourceSync() {
+ clusterConfigs :=
c.controller.apisixUpstreamInformer.GetIndexer().List()
+ for _, clusterConfig := range clusterConfigs {
+ key, err := cache.MetaNamespaceKeyFunc(clusterConfig)
+ if err != nil {
+ log.Errorw("ApisixUpstream sync failed, found
ApisixUpstream resource with bad meta namespace key", zap.String("error",
err.Error()))
+ continue
+ }
+ if !c.controller.isWatchingNamespace(key) {
+ continue
+ }
+ c.workqueue.Add(&types.Event{
+ Type: types.EventAdd,
+ Object: key,
+ })
+ }
+}
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 80c36e34..f6451a8e 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -64,6 +64,8 @@ const (
_resourceSyncAborted = "ResourceSyncAborted"
// _messageResourceFailed is used to report error
_messageResourceFailed = "%s synced failed, with error: %s"
+ // minimum interval for ingress sync to APISIX
+ _mininumApisixResourceSyncInterval = 60 * time.Second
)
// Controller is the ingress apisix controller object.
@@ -399,7 +401,6 @@ func (c *Controller) Run(stop chan struct{}) error {
ReleaseOnCancel: true,
Name: "ingress-apisix",
}
-
elector, err := leaderelection.NewLeaderElector(cfg)
if err != nil {
log.Errorf("failed to create leader elector: %s", err.Error())
@@ -569,6 +570,9 @@ func (c *Controller) run(ctx context.Context) {
c.apisixPluginConfigController.run(ctx)
})
+ c.goAttach(func() {
+ c.resourceSyncLoop(ctx,
c.cfg.ApisixResourceSyncInterval.Duration)
+ })
c.MetricsCollector.ResetLeader(true)
log.Infow("controller now is running as leader",
@@ -727,3 +731,57 @@ func (c *Controller) checkClusterHealth(ctx
context.Context, cancelFunc context.
c.MetricsCollector.IncrCheckClusterHealth(c.name)
}
}
+
+func (c *Controller) syncAllResources() {
+ wg := sync.WaitGroup{}
+ goAttach := func(handler func()) {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ handler()
+ }()
+ }
+ goAttach(func() {
+ c.apisixConsumerController.ResourceSync()
+ })
+ goAttach(func() {
+ c.apisixRouteController.ResourceSync()
+ })
+ goAttach(func() {
+ c.apisixClusterConfigController.ResourceSync()
+ })
+ goAttach(func() {
+ c.apisixPluginConfigController.ResourceSync()
+ })
+ goAttach(func() {
+ c.apisixUpstreamController.ResourceSync()
+ })
+ goAttach(func() {
+ c.apisixTlsController.ResourceSync()
+ })
+ goAttach(func() {
+ c.ingressController.ResourceSync()
+ })
+ wg.Wait()
+}
+
+func (c *Controller) resourceSyncLoop(ctx context.Context, interval
time.Duration) {
+ // The interval shall not be less than 60 seconds.
+ if interval < _mininumApisixResourceSyncInterval {
+ log.Warnw("The apisix-resource-sync-interval shall not be less
than 60 seconds.",
+ zap.String("apisix-resource-sync-interval",
interval.String()),
+ )
+ interval = _mininumApisixResourceSyncInterval
+ }
+ ticker := time.NewTicker(interval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ c.syncAllResources()
+ continue
+ case <-ctx.Done():
+ return
+ }
+ }
+}
diff --git a/pkg/ingress/ingress.go b/pkg/ingress/ingress.go
index 8e5c0d1e..30053f04 100644
--- a/pkg/ingress/ingress.go
+++ b/pkg/ingress/ingress.go
@@ -403,3 +403,25 @@ func (c *ingressController) isIngressEffective(ing
kube.Ingress) bool {
}
return false
}
+
+func (c *ingressController) ResourceSync() {
+ objs := c.controller.ingressInformer.GetIndexer().List()
+ for _, obj := range objs {
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err != nil {
+ log.Errorw("found ApisixConsumer resource with bad meta
namespace key", zap.String("error", err.Error()))
+ continue
+ }
+ if !c.controller.isWatchingNamespace(key) {
+ continue
+ }
+ ing := kube.MustNewIngress(obj)
+ c.workqueue.Add(&types.Event{
+ Type: types.EventAdd,
+ Object: kube.IngressEvent{
+ Key: key,
+ GroupVersion: ing.GroupVersion(),
+ },
+ })
+ }
+}
diff --git a/samples/deploy/configmap/apisix-ingress-cm.yaml
b/samples/deploy/configmap/apisix-ingress-cm.yaml
index 5c98ff8f..edcc32b0 100644
--- a/samples/deploy/configmap/apisix-ingress-cm.yaml
+++ b/samples/deploy/configmap/apisix-ingress-cm.yaml
@@ -35,7 +35,7 @@ data:
http_listen: ":8080" # the HTTP Server listen address, default is ":8080"
enable_profiling: true # enable profiling via web interfaces
# host:port/debug/pprof, default is true.
-
+ apisix-resource-sync-interval: 300s # Default interval for synchronizing
Kubernetes resources to APISIX
# Kubernetes related configurations.
kubernetes:
kubeconfig: "" # the Kubernetes configuration file path, default
is
diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go
index cef850df..13cd350b 100644
--- a/test/e2e/scaffold/ingress.go
+++ b/test/e2e/scaffold/ingress.go
@@ -277,6 +277,8 @@ spec:
- debug
- --log-output
- stdout
+ - --apisix-resource-sync-interval
+ - %s
- --http-listen
- :8080
- --https-listen
@@ -417,10 +419,10 @@ func (s *Scaffold) newIngressAPISIXController() error {
var ingressAPISIXDeployment string
label := fmt.Sprintf("apisix.ingress.watch=%s", s.namespace)
if s.opts.EnableWebhooks {
- ingressAPISIXDeployment =
fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate),
s.opts.IngressAPISIXReplicas, s.namespace,
+ ingressAPISIXDeployment =
fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate),
s.opts.IngressAPISIXReplicas, s.namespace, s.opts.ApisixResourceSyncInterval,
s.FormatNamespaceLabel(label),
s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress,
s.opts.EnableGatewayAPI, _volumeMounts, _webhookCertSecret)
} else {
- ingressAPISIXDeployment =
fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate),
s.opts.IngressAPISIXReplicas, s.namespace,
+ ingressAPISIXDeployment =
fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate),
s.opts.IngressAPISIXReplicas, s.namespace, s.opts.ApisixResourceSyncInterval,
s.FormatNamespaceLabel(label),
s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress,
s.opts.EnableGatewayAPI, "", _webhookCertSecret)
}
@@ -527,9 +529,10 @@ func (s *Scaffold) ScaleIngressController(desired int)
error {
var ingressDeployment string
label := fmt.Sprintf("apisix.ingress.watch=%s", s.namespace)
if s.opts.EnableWebhooks {
- ingressDeployment =
fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), desired,
s.namespace, label, s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress,
s.opts.EnableGatewayAPI, _volumeMounts, _webhookCertSecret)
+
+ ingressDeployment =
fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), desired,
s.namespace, s.opts.ApisixResourceSyncInterval, label,
s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress,
s.opts.EnableGatewayAPI, _volumeMounts, _webhookCertSecret)
} else {
- ingressDeployment =
fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), desired,
s.namespace, label, s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress,
s.opts.EnableGatewayAPI, "", _webhookCertSecret)
+ ingressDeployment =
fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), desired,
s.namespace, s.opts.ApisixResourceSyncInterval, label,
s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress,
s.opts.EnableGatewayAPI, "", _webhookCertSecret)
}
if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions,
ingressDeployment); err != nil {
return err
diff --git a/test/e2e/scaffold/k8s.go b/test/e2e/scaffold/k8s.go
index b788c92f..e9c6cbb1 100644
--- a/test/e2e/scaffold/k8s.go
+++ b/test/e2e/scaffold/k8s.go
@@ -252,7 +252,7 @@ func (s *Scaffold)
EnsureNumApisixPluginConfigCreated(desired int) error {
return s.ensureNumApisixCRDsCreated(u.String(), desired)
}
-// CreateApisixRouteByApisixAdmin create a route
+// CreateApisixRouteByApisixAdmin create or update a route
func (s *Scaffold) CreateApisixRouteByApisixAdmin(routeID string, body []byte)
error {
u := url.URL{
Scheme: "http",
@@ -262,6 +262,16 @@ func (s *Scaffold) CreateApisixRouteByApisixAdmin(routeID
string, body []byte) e
return s.ensureAdminOperationIsSuccessful(u.String(), "PUT", body)
}
+// CreateApisixRouteByApisixAdmin create or update a consumer
+func (s *Scaffold) CreateApisixConsumerByApisixAdmin(body []byte) error {
+ u := url.URL{
+ Scheme: "http",
+ Host: s.apisixAdminTunnel.Endpoint(),
+ Path: "/apisix/admin/consumers",
+ }
+ return s.ensureAdminOperationIsSuccessful(u.String(), "PUT", body)
+}
+
// DeleteApisixRouteByApisixAdmin deletes a route by its route name in APISIX
cluster.
func (s *Scaffold) DeleteApisixRouteByApisixAdmin(routeID string) error {
u := url.URL{
@@ -272,6 +282,16 @@ func (s *Scaffold) DeleteApisixRouteByApisixAdmin(routeID
string) error {
return s.ensureAdminOperationIsSuccessful(u.String(), "DELETE", nil)
}
+// DeleteApisixConsumerByApisixAdmin deletes a consumer by its consumer name
in APISIX cluster.
+func (s *Scaffold) DeleteApisixConsumerByApisixAdmin(consumerName string)
error {
+ u := url.URL{
+ Scheme: "http",
+ Host: s.apisixAdminTunnel.Endpoint(),
+ Path: "/apisix/admin/consumers/" + consumerName,
+ }
+ return s.ensureAdminOperationIsSuccessful(u.String(), "DELETE", nil)
+}
+
func (s *Scaffold) ensureAdminOperationIsSuccessful(url, method string, body
[]byte) error {
condFunc := func() (bool, error) {
req, err := http.NewRequest(method, url, bytes.NewBuffer(body))
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index 01d4f50c..ee29fcc5 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -61,6 +61,7 @@ type Options struct {
EnableWebhooks bool
APISIXPublishAddress string
disableNamespaceSelector bool
+ ApisixResourceSyncInterval string
EnableGatewayAPI bool
}
@@ -127,6 +128,9 @@ func NewScaffold(o *Options) *Scaffold {
if o.APISIXAdminAPIKey == "" {
o.APISIXAdminAPIKey = "edd1c9f034335f136f87ad84b625c8f1"
}
+ if o.ApisixResourceSyncInterval == "" {
+ o.ApisixResourceSyncInterval = "300s"
+ }
defer ginkgo.GinkgoRecover()
s := &Scaffold{
diff --git a/test/e2e/suite-ingress/resourcesync.go
b/test/e2e/suite-ingress/resourcesync.go
new file mode 100644
index 00000000..559c36fe
--- /dev/null
+++ b/test/e2e/suite-ingress/resourcesync.go
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package ingress
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "time"
+
+ ginkgo "github.com/onsi/ginkgo/v2"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/apisix-ingress-controller/pkg/id"
+ "github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+)
+
+var _ = ginkgo.Describe("suite-ingress: apisix resource sync", func() {
+ opts := &scaffold.Options{
+ Name: "default",
+ Kubeconfig: scaffold.GetKubeconfig(),
+ APISIXConfigPath: "testdata/apisix-gw-config.yaml",
+ IngressAPISIXReplicas: 1,
+ HTTPBinServicePort: 80,
+ APISIXRouteVersion: "apisix.apache.org/v2beta3",
+ ApisixResourceSyncInterval: "60s",
+ }
+ s := scaffold.NewScaffold(opts)
+ ginkgo.JustBeforeEach(func() {
+ backendSvc, backendPorts := s.DefaultHTTPBackend()
+ // Create ApisixRoute resource
+ ar := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2beta3
+kind: ApisixRoute
+metadata:
+ name: httpbin-route
+spec:
+ http:
+ - name: rule1
+ match:
+ hosts:
+ - httpbin.org
+ paths:
+ - /ip
+ backends:
+ - serviceName: %s
+ servicePort: %d
+ authentication:
+ enable: true
+ type: keyAuth
+`, backendSvc, backendPorts[0])
+ assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(ar))
+ err := s.EnsureNumApisixUpstreamsCreated(1)
+ assert.Nil(ginkgo.GinkgoT(), err, "Checking number of
upstreams")
+ err = s.EnsureNumApisixRoutesCreated(1)
+ assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
+
+ // Create Ingress resource
+ ing := fmt.Sprintf(`
+apiVersion: networking.k8s.io/v1
+kind: Ingress
+metadata:
+ annotations:
+ kubernetes.io/ingress.class: apisix
+ name: ingress-route
+spec:
+ rules:
+ - host: local.httpbin.org
+ http:
+ paths:
+ - path: /headers
+ pathType: Exact
+ backend:
+ service:
+ name: %s
+ port:
+ number: %d
+`, backendSvc, backendPorts[0])
+ assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(ing))
+
+ // Create ApisixConsumer resource
+ err = s.ApisixConsumerKeyAuthCreated("foo", "foo-key")
+ assert.Nil(ginkgo.GinkgoT(), err)
+ })
+
+ ginkgo.It("for modified resource sync consistency", func() {
+ // crd resource sync interval
+ readyTime := time.Now().Add(60 * time.Second)
+
+ routes, _ := s.ListApisixRoutes()
+ assert.Len(ginkgo.GinkgoT(), routes, 2)
+
+ consumers, _ := s.ListApisixConsumers()
+ assert.Len(ginkgo.GinkgoT(), consumers, 1)
+
+ for _, route := range routes {
+ _ =
s.CreateApisixRouteByApisixAdmin(id.GenID(route.Name), []byte(`
+{
+ "methods": ["GET"],
+ "uri": "/anything",
+ "plugins": {
+ "key-auth": {}
+ },
+ "upstream": {
+ "type": "roundrobin",
+ "nodes": {
+ "httpbin.org": 1
+ }
+ }
+}`))
+ }
+
+ for _, consumer := range consumers {
+ _ =
s.CreateApisixConsumerByApisixAdmin([]byte(fmt.Sprintf(`
+{
+ "username": "%s",
+ "plugins": {
+ "key-auth": {
+ "key": "auth-one"
+ }
+ }
+}`, consumer.Username)))
+ }
+
+ _ = s.NewAPISIXClient().
+ GET("/ip").
+ WithHeader("Host", "httpbin.org").
+ Expect().
+ Status(http.StatusNotFound)
+
+ _ = s.NewAPISIXClient().
+ GET("/headers").
+ WithHeader("Host", "local.httpbin.org").
+ Expect().
+ Status(http.StatusNotFound)
+
+ waitTime := time.Until(readyTime).Seconds()
+ time.Sleep(time.Duration(waitTime) * time.Second)
+
+ _ = s.NewAPISIXClient().
+ GET("/ip").
+ WithHeader("Host", "httpbin.org").
+ WithHeader("apikey", "foo-key").
+ Expect().
+ Status(http.StatusOK)
+
+ _ = s.NewAPISIXClient().
+ GET("/headers").
+ WithHeader("Host", "local.httpbin.org").
+ Expect().
+ Status(http.StatusOK)
+
+ consumers, _ = s.ListApisixConsumers()
+ assert.Len(ginkgo.GinkgoT(), consumers, 1)
+ data, _ := json.Marshal(consumers[0])
+ assert.Contains(ginkgo.GinkgoT(), string(data), "foo-key")
+ })
+
+ ginkgo.It("for deleted resource sync consistency", func() {
+ // crd resource sync interval
+ readyTime := time.Now().Add(60 * time.Second)
+
+ routes, _ := s.ListApisixRoutes()
+ assert.Len(ginkgo.GinkgoT(), routes, 2)
+
+ consumers, _ := s.ListApisixConsumers()
+ assert.Len(ginkgo.GinkgoT(), consumers, 1)
+
+ for _, route := range routes {
+ _ =
s.DeleteApisixRouteByApisixAdmin(id.GenID(route.Name))
+ }
+
+ for _, consumer := range consumers {
+ s.DeleteApisixConsumerByApisixAdmin(consumer.Username)
+ }
+
+ _ = s.NewAPISIXClient().
+ GET("/ip").
+ WithHeader("Host", "httpbin.org").
+ Expect().
+ Status(http.StatusNotFound)
+
+ _ = s.NewAPISIXClient().
+ GET("/headers").
+ WithHeader("Host", "local.httpbin.org").
+ Expect().
+ Status(http.StatusNotFound)
+
+ routes, _ = s.ListApisixRoutes()
+ assert.Len(ginkgo.GinkgoT(), routes, 0)
+ consumers, _ = s.ListApisixConsumers()
+ assert.Len(ginkgo.GinkgoT(), consumers, 0)
+
+ waitTime := time.Until(readyTime).Seconds()
+ time.Sleep(time.Duration(waitTime) * time.Second)
+
+ _ = s.NewAPISIXClient().
+ GET("/ip").
+ WithHeader("Host", "httpbin.org").
+ WithHeader("apikey", "foo-key").
+ Expect().
+ Status(http.StatusOK)
+
+ _ = s.NewAPISIXClient().
+ GET("/headers").
+ WithHeader("Host", "local.httpbin.org").
+ Expect().
+ Status(http.StatusOK)
+
+ consumers, _ = s.ListApisixConsumers()
+ assert.Len(ginkgo.GinkgoT(), consumers, 1)
+ data, _ := json.Marshal(consumers[0])
+ assert.Contains(ginkgo.GinkgoT(), string(data), "foo-key")
+ })
+})