This is an automated email from the ASF dual-hosted git repository.

zhangjintao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 6394cdd1 feat: sync CRD and ingress resource to APISIX mechanism. 
(#1022)
6394cdd1 is described below

commit 6394cdd11e96f205756bb8c19eaa047bdc114774
Author: Xin Rong <[email protected]>
AuthorDate: Wed Jun 22 23:29:29 2022 +0800

    feat: sync CRD and ingress resource to APISIX mechanism. (#1022)
---
 cmd/ingress/ingress.go                          |   1 +
 conf/config-default.yaml                        |   2 +-
 pkg/config/config.go                            |  42 ++---
 pkg/config/config_test.go                       |  41 +++--
 pkg/ingress/apisix_cluster_config.go            |  26 +++
 pkg/ingress/apisix_consumer.go                  |  26 +++
 pkg/ingress/apisix_pluginconfig.go              |  22 +++
 pkg/ingress/apisix_route.go                     |  22 +++
 pkg/ingress/apisix_tls.go                       |  26 +++
 pkg/ingress/apisix_upstream.go                  |  18 ++
 pkg/ingress/controller.go                       |  60 ++++++-
 pkg/ingress/ingress.go                          |  22 +++
 samples/deploy/configmap/apisix-ingress-cm.yaml |   2 +-
 test/e2e/scaffold/ingress.go                    |  11 +-
 test/e2e/scaffold/k8s.go                        |  22 ++-
 test/e2e/scaffold/scaffold.go                   |   4 +
 test/e2e/suite-ingress/resourcesync.go          | 227 ++++++++++++++++++++++++
 17 files changed, 528 insertions(+), 46 deletions(-)

diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go
index 324ff7b1..33005e32 100644
--- a/cmd/ingress/ingress.go
+++ b/cmd/ingress/ingress.go
@@ -171,6 +171,7 @@ For example, no available LB exists in the bare metal 
environment.`)
        cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterBaseURL, 
"default-apisix-cluster-base-url", "", "the base URL of admin api / manager api 
for the default APISIX cluster")
        cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterAdminKey, 
"default-apisix-cluster-admin-key", "", "admin key used for the authorization 
of admin api / manager api for the default APISIX cluster")
        cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterName, 
"default-apisix-cluster-name", "default", "name of the default apisix cluster")
+       
cmd.PersistentFlags().DurationVar(&cfg.ApisixResourceSyncInterval.Duration, 
"apisix-resource-sync-interval", 300*time.Second, "interval between syncs in 
seconds. Default value is 300s.")
 
        if err := cmd.PersistentFlags().MarkDeprecated("app-namespace", "use 
namespace-selector instead"); err != nil {
                dief("failed to mark `app-namespace` as deprecated: %s", err)
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 5e05390a..55327366 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -44,7 +44,7 @@ ingress_status_address: []   # when there is no available 
information on the Ser
                              # For example, no available LB exists in the bare 
metal environment.
 enable_profiling: true # enable profiling via web interfaces
                        # host:port/debug/pprof, default is true.
-
+apisix-resource-sync-interval: "300s" # Default interval for synchronizing 
Kubernetes resources to APISIX
 # Kubernetes related configurations.
 kubernetes:
   kubeconfig: ""                       # the Kubernetes configuration file 
path, default is
diff --git a/pkg/config/config.go b/pkg/config/config.go
index af6d3665..d5f6f350 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -72,17 +72,18 @@ const (
 // Config contains all config items which are necessary for
 // apisix-ingress-controller's running.
 type Config struct {
-       CertFilePath          string           `json:"cert_file" 
yaml:"cert_file"`
-       KeyFilePath           string           `json:"key_file" yaml:"key_file"`
-       LogLevel              string           `json:"log_level" 
yaml:"log_level"`
-       LogOutput             string           `json:"log_output" 
yaml:"log_output"`
-       HTTPListen            string           `json:"http_listen" 
yaml:"http_listen"`
-       HTTPSListen           string           `json:"https_listen" 
yaml:"https_listen"`
-       IngressPublishService string           `json:"ingress_publish_service" 
yaml:"ingress_publish_service"`
-       IngressStatusAddress  []string         `json:"ingress_status_address" 
yaml:"ingress_status_address"`
-       EnableProfiling       bool             `json:"enable_profiling" 
yaml:"enable_profiling"`
-       Kubernetes            KubernetesConfig `json:"kubernetes" 
yaml:"kubernetes"`
-       APISIX                APISIXConfig     `json:"apisix" yaml:"apisix"`
+       CertFilePath               string             `json:"cert_file" 
yaml:"cert_file"`
+       KeyFilePath                string             `json:"key_file" 
yaml:"key_file"`
+       LogLevel                   string             `json:"log_level" 
yaml:"log_level"`
+       LogOutput                  string             `json:"log_output" 
yaml:"log_output"`
+       HTTPListen                 string             `json:"http_listen" 
yaml:"http_listen"`
+       HTTPSListen                string             `json:"https_listen" 
yaml:"https_listen"`
+       IngressPublishService      string             
`json:"ingress_publish_service" yaml:"ingress_publish_service"`
+       IngressStatusAddress       []string           
`json:"ingress_status_address" yaml:"ingress_status_address"`
+       EnableProfiling            bool               `json:"enable_profiling" 
yaml:"enable_profiling"`
+       Kubernetes                 KubernetesConfig   `json:"kubernetes" 
yaml:"kubernetes"`
+       APISIX                     APISIXConfig       `json:"apisix" 
yaml:"apisix"`
+       ApisixResourceSyncInterval types.TimeDuration 
`json:"apisix-resource-sync-interval" yaml:"apisix-resource-sync-interval"`
 }
 
 // KubernetesConfig contains all Kubernetes related config items.
@@ -118,15 +119,16 @@ type APISIXConfig struct {
 // default value.
 func NewDefaultConfig() *Config {
        return &Config{
-               LogLevel:              "warn",
-               LogOutput:             "stderr",
-               HTTPListen:            ":8080",
-               HTTPSListen:           ":8443",
-               IngressPublishService: "",
-               IngressStatusAddress:  []string{},
-               CertFilePath:          "/etc/webhook/certs/cert.pem",
-               KeyFilePath:           "/etc/webhook/certs/key.pem",
-               EnableProfiling:       true,
+               LogLevel:                   "warn",
+               LogOutput:                  "stderr",
+               HTTPListen:                 ":8080",
+               HTTPSListen:                ":8443",
+               IngressPublishService:      "",
+               IngressStatusAddress:       []string{},
+               CertFilePath:               "/etc/webhook/certs/cert.pem",
+               KeyFilePath:                "/etc/webhook/certs/key.pem",
+               EnableProfiling:            true,
+               ApisixResourceSyncInterval: types.TimeDuration{Duration: 300 * 
time.Second},
                Kubernetes: KubernetesConfig{
                        Kubeconfig:                 "", // Use in-cluster 
configurations.
                        ResyncInterval:             
types.TimeDuration{Duration: 6 * time.Hour},
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index aa0a3077..cdca004a 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -28,15 +28,16 @@ import (
 
 func TestNewConfigFromFile(t *testing.T) {
        cfg := &Config{
-               LogLevel:              "warn",
-               LogOutput:             "stdout",
-               HTTPListen:            ":9090",
-               HTTPSListen:           ":9443",
-               IngressPublishService: "",
-               IngressStatusAddress:  []string{},
-               CertFilePath:          "/etc/webhook/certs/cert.pem",
-               KeyFilePath:           "/etc/webhook/certs/key.pem",
-               EnableProfiling:       true,
+               LogLevel:                   "warn",
+               LogOutput:                  "stdout",
+               HTTPListen:                 ":9090",
+               HTTPSListen:                ":9443",
+               IngressPublishService:      "",
+               IngressStatusAddress:       []string{},
+               CertFilePath:               "/etc/webhook/certs/cert.pem",
+               KeyFilePath:                "/etc/webhook/certs/key.pem",
+               EnableProfiling:            true,
+               ApisixResourceSyncInterval: types.TimeDuration{Duration: 200 * 
time.Second},
                Kubernetes: KubernetesConfig{
                        ResyncInterval:             
types.TimeDuration{Duration: time.Hour},
                        Kubeconfig:                 "/path/to/foo/baz",
@@ -86,6 +87,7 @@ https_listen: :9443
 ingress_publish_service: ""
 ingress_status_address: []
 enable_profiling: true
+apisix-resource-sync-interval: 200s
 kubernetes:
   kubeconfig: /path/to/foo/baz
   resync_interval: 1h0m0s
@@ -113,15 +115,16 @@ apisix:
 
 func TestConfigWithEnvVar(t *testing.T) {
        cfg := &Config{
-               LogLevel:              "warn",
-               LogOutput:             "stdout",
-               HTTPListen:            ":9090",
-               HTTPSListen:           ":9443",
-               IngressPublishService: "",
-               IngressStatusAddress:  []string{},
-               CertFilePath:          "/etc/webhook/certs/cert.pem",
-               KeyFilePath:           "/etc/webhook/certs/key.pem",
-               EnableProfiling:       true,
+               LogLevel:                   "warn",
+               LogOutput:                  "stdout",
+               HTTPListen:                 ":9090",
+               HTTPSListen:                ":9443",
+               IngressPublishService:      "",
+               IngressStatusAddress:       []string{},
+               CertFilePath:               "/etc/webhook/certs/cert.pem",
+               KeyFilePath:                "/etc/webhook/certs/key.pem",
+               EnableProfiling:            true,
+               ApisixResourceSyncInterval: types.TimeDuration{Duration: 200 * 
time.Second},
                Kubernetes: KubernetesConfig{
                        ResyncInterval:             
types.TimeDuration{Duration: time.Hour},
                        Kubeconfig:                 "",
@@ -160,6 +163,7 @@ func TestConfigWithEnvVar(t *testing.T) {
        "ingress_publish_service": "",
        "ingress_status_address": [],
     "enable_profiling": true,
+       "apisix-resource-sync-interval": "200s",
     "kubernetes": {
         "kubeconfig": "{{.KUBECONFIG}}",
         "resync_interval": "1h0m0s",
@@ -195,6 +199,7 @@ https_listen: :9443
 ingress_publish_service: ""
 ingress_status_address: []
 enable_profiling: true
+apisix-resource-sync-interval: 200s
 kubernetes:
   resync_interval: 1h0m0s
   kubeconfig: "{{.KUBECONFIG}}"
diff --git a/pkg/ingress/apisix_cluster_config.go 
b/pkg/ingress/apisix_cluster_config.go
index ecc33d0e..fa346381 100644
--- a/pkg/ingress/apisix_cluster_config.go
+++ b/pkg/ingress/apisix_cluster_config.go
@@ -403,3 +403,29 @@ func (c *apisixClusterConfigController) onDelete(obj 
interface{}) {
 
        c.controller.MetricsCollector.IncrEvents("clusterConfig", "delete")
 }
+
+func (c *apisixClusterConfigController) ResourceSync() {
+       objs := c.controller.apisixClusterConfigInformer.GetIndexer().List()
+       for _, obj := range objs {
+               key, err := cache.MetaNamespaceKeyFunc(obj)
+               if err != nil {
+                       log.Errorw("ApisixClusterConfig sync failed, found 
ApisixClusterConfig resource with bad meta namespace key", zap.String("error", 
err.Error()))
+                       continue
+               }
+               if !c.controller.isWatchingNamespace(key) {
+                       continue
+               }
+               acc, err := kube.NewApisixClusterConfig(obj)
+               if err != nil {
+                       log.Errorw("found ApisixClusterConfig resource with bad 
type", zap.String("error", err.Error()))
+                       return
+               }
+               c.workqueue.Add(&types.Event{
+                       Type: types.EventAdd,
+                       Object: kube.ApisixClusterConfigEvent{
+                               Key:          key,
+                               GroupVersion: acc.GroupVersion(),
+                       },
+               })
+       }
+}
diff --git a/pkg/ingress/apisix_consumer.go b/pkg/ingress/apisix_consumer.go
index 8a456a70..581239dd 100644
--- a/pkg/ingress/apisix_consumer.go
+++ b/pkg/ingress/apisix_consumer.go
@@ -318,3 +318,29 @@ func (c *apisixConsumerController) onDelete(obj 
interface{}) {
 
        c.controller.MetricsCollector.IncrEvents("consumer", "delete")
 }
+
+func (c *apisixConsumerController) ResourceSync() {
+       objs := c.controller.apisixConsumerInformer.GetIndexer().List()
+       for _, obj := range objs {
+               key, err := cache.MetaNamespaceKeyFunc(obj)
+               if err != nil {
+                       log.Errorw("ApisixConsumer sync failed, found 
ApisixConsumer resource with bad meta namespace key", zap.String("error", 
err.Error()))
+                       continue
+               }
+               if !c.controller.isWatchingNamespace(key) {
+                       continue
+               }
+               ac, err := kube.NewApisixConsumer(obj)
+               if err != nil {
+                       log.Errorw("found ApisixConsumer resource with bad 
type", zap.String("error", err.Error()))
+                       return
+               }
+               c.workqueue.Add(&types.Event{
+                       Type: types.EventAdd,
+                       Object: kube.ApisixConsumerEvent{
+                               Key:          key,
+                               GroupVersion: ac.GroupVersion(),
+                       },
+               })
+       }
+}
diff --git a/pkg/ingress/apisix_pluginconfig.go 
b/pkg/ingress/apisix_pluginconfig.go
index 3faf885b..fa7e64a7 100644
--- a/pkg/ingress/apisix_pluginconfig.go
+++ b/pkg/ingress/apisix_pluginconfig.go
@@ -367,3 +367,25 @@ func (c *apisixPluginConfigController) onDelete(obj 
interface{}) {
 
        c.controller.MetricsCollector.IncrEvents("PluginConfig", "delete")
 }
+
+func (c *apisixPluginConfigController) ResourceSync() {
+       objs := c.controller.apisixPluginConfigInformer.GetIndexer().List()
+       for _, obj := range objs {
+               key, err := cache.MetaNamespaceKeyFunc(obj)
+               if err != nil {
+                       log.Errorw("ApisixPluginConfig sync failed, found 
ApisixPluginConfig resource with bad meta namespace key", zap.String("error", 
err.Error()))
+                       continue
+               }
+               if !c.controller.isWatchingNamespace(key) {
+                       continue
+               }
+               apc := kube.MustNewApisixPluginConfig(obj)
+               c.workqueue.Add(&types.Event{
+                       Type: types.EventAdd,
+                       Object: kube.ApisixPluginConfigEvent{
+                               Key:          key,
+                               GroupVersion: apc.GroupVersion(),
+                       },
+               })
+       }
+}
diff --git a/pkg/ingress/apisix_route.go b/pkg/ingress/apisix_route.go
index c2ecee33..a2f95374 100644
--- a/pkg/ingress/apisix_route.go
+++ b/pkg/ingress/apisix_route.go
@@ -449,3 +449,25 @@ func (c *apisixRouteController) onDelete(obj interface{}) {
 
        c.controller.MetricsCollector.IncrEvents("route", "delete")
 }
+
+func (c *apisixRouteController) ResourceSync() {
+       objs := c.controller.apisixRouteInformer.GetIndexer().List()
+       for _, obj := range objs {
+               key, err := cache.MetaNamespaceKeyFunc(obj)
+               if err != nil {
+                       log.Errorw("ApisixRoute sync failed, found ApisixRoute 
resource with bad meta namespace key", zap.String("error", err.Error()))
+                       continue
+               }
+               if !c.controller.isWatchingNamespace(key) {
+                       continue
+               }
+               ar := kube.MustNewApisixRoute(obj)
+               c.workqueue.Add(&types.Event{
+                       Type: types.EventAdd,
+                       Object: kube.ApisixRouteEvent{
+                               Key:          key,
+                               GroupVersion: ar.GroupVersion(),
+                       },
+               })
+       }
+}
diff --git a/pkg/ingress/apisix_tls.go b/pkg/ingress/apisix_tls.go
index 66bc8442..be484edc 100644
--- a/pkg/ingress/apisix_tls.go
+++ b/pkg/ingress/apisix_tls.go
@@ -359,3 +359,29 @@ func (c *apisixTlsController) onDelete(obj interface{}) {
 
        c.controller.MetricsCollector.IncrEvents("TLS", "delete")
 }
+
+func (c *apisixTlsController) ResourceSync() {
+       objs := c.controller.apisixTlsInformer.GetIndexer().List()
+       for _, obj := range objs {
+               key, err := cache.MetaNamespaceKeyFunc(obj)
+               if err != nil {
+                       log.Errorw("ApisixTls sync failed, found ApisixTls 
object with bad namespace/name ignore it", zap.String("error", err.Error()))
+                       continue
+               }
+               if !c.controller.isWatchingNamespace(key) {
+                       continue
+               }
+               tls, err := kube.NewApisixTls(obj)
+               if err != nil {
+                       log.Errorw("ApisixTls sync failed, found ApisixTls 
resource with bad type", zap.Error(err))
+                       continue
+               }
+               c.workqueue.Add(&types.Event{
+                       Type: types.EventAdd,
+                       Object: kube.ApisixTlsEvent{
+                               Key:          key,
+                               GroupVersion: tls.GroupVersion(),
+                       },
+               })
+       }
+}
diff --git a/pkg/ingress/apisix_upstream.go b/pkg/ingress/apisix_upstream.go
index ca882970..a82f5df8 100644
--- a/pkg/ingress/apisix_upstream.go
+++ b/pkg/ingress/apisix_upstream.go
@@ -301,3 +301,21 @@ func (c *apisixUpstreamController) onDelete(obj 
interface{}) {
 
        c.controller.MetricsCollector.IncrEvents("upstream", "delete")
 }
+
+func (c *apisixUpstreamController) ResourceSync() {
+       clusterConfigs := 
c.controller.apisixUpstreamInformer.GetIndexer().List()
+       for _, clusterConfig := range clusterConfigs {
+               key, err := cache.MetaNamespaceKeyFunc(clusterConfig)
+               if err != nil {
+                       log.Errorw("ApisixUpstream sync failed, found 
ApisixUpstream resource with bad meta namespace key", zap.String("error", 
err.Error()))
+                       continue
+               }
+               if !c.controller.isWatchingNamespace(key) {
+                       continue
+               }
+               c.workqueue.Add(&types.Event{
+                       Type:   types.EventAdd,
+                       Object: key,
+               })
+       }
+}
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 80c36e34..f6451a8e 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -64,6 +64,8 @@ const (
        _resourceSyncAborted = "ResourceSyncAborted"
        // _messageResourceFailed is used to report error
        _messageResourceFailed = "%s synced failed, with error: %s"
+       // minimum interval for ingress sync to APISIX
+       _mininumApisixResourceSyncInterval = 60 * time.Second
 )
 
 // Controller is the ingress apisix controller object.
@@ -399,7 +401,6 @@ func (c *Controller) Run(stop chan struct{}) error {
                ReleaseOnCancel: true,
                Name:            "ingress-apisix",
        }
-
        elector, err := leaderelection.NewLeaderElector(cfg)
        if err != nil {
                log.Errorf("failed to create leader elector: %s", err.Error())
@@ -569,6 +570,9 @@ func (c *Controller) run(ctx context.Context) {
                c.apisixPluginConfigController.run(ctx)
        })
 
+       c.goAttach(func() {
+               c.resourceSyncLoop(ctx, 
c.cfg.ApisixResourceSyncInterval.Duration)
+       })
        c.MetricsCollector.ResetLeader(true)
 
        log.Infow("controller now is running as leader",
@@ -727,3 +731,57 @@ func (c *Controller) checkClusterHealth(ctx 
context.Context, cancelFunc context.
                c.MetricsCollector.IncrCheckClusterHealth(c.name)
        }
 }
+
+func (c *Controller) syncAllResources() {
+       wg := sync.WaitGroup{}
+       goAttach := func(handler func()) {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       handler()
+               }()
+       }
+       goAttach(func() {
+               c.apisixConsumerController.ResourceSync()
+       })
+       goAttach(func() {
+               c.apisixRouteController.ResourceSync()
+       })
+       goAttach(func() {
+               c.apisixClusterConfigController.ResourceSync()
+       })
+       goAttach(func() {
+               c.apisixPluginConfigController.ResourceSync()
+       })
+       goAttach(func() {
+               c.apisixUpstreamController.ResourceSync()
+       })
+       goAttach(func() {
+               c.apisixTlsController.ResourceSync()
+       })
+       goAttach(func() {
+               c.ingressController.ResourceSync()
+       })
+       wg.Wait()
+}
+
+func (c *Controller) resourceSyncLoop(ctx context.Context, interval 
time.Duration) {
+       // The interval shall not be less than 60 seconds.
+       if interval < _mininumApisixResourceSyncInterval {
+               log.Warnw("The apisix-resource-sync-interval shall not be less 
than 60 seconds.",
+                       zap.String("apisix-resource-sync-interval", 
interval.String()),
+               )
+               interval = _mininumApisixResourceSyncInterval
+       }
+       ticker := time.NewTicker(interval)
+       defer ticker.Stop()
+       for {
+               select {
+               case <-ticker.C:
+                       c.syncAllResources()
+                       continue
+               case <-ctx.Done():
+                       return
+               }
+       }
+}
diff --git a/pkg/ingress/ingress.go b/pkg/ingress/ingress.go
index 8e5c0d1e..30053f04 100644
--- a/pkg/ingress/ingress.go
+++ b/pkg/ingress/ingress.go
@@ -403,3 +403,25 @@ func (c *ingressController) isIngressEffective(ing 
kube.Ingress) bool {
        }
        return false
 }
+
+func (c *ingressController) ResourceSync() {
+       objs := c.controller.ingressInformer.GetIndexer().List()
+       for _, obj := range objs {
+               key, err := cache.MetaNamespaceKeyFunc(obj)
+               if err != nil {
+                       log.Errorw("found ApisixConsumer resource with bad meta 
namespace key", zap.String("error", err.Error()))
+                       continue
+               }
+               if !c.controller.isWatchingNamespace(key) {
+                       continue
+               }
+               ing := kube.MustNewIngress(obj)
+               c.workqueue.Add(&types.Event{
+                       Type: types.EventAdd,
+                       Object: kube.IngressEvent{
+                               Key:          key,
+                               GroupVersion: ing.GroupVersion(),
+                       },
+               })
+       }
+}
diff --git a/samples/deploy/configmap/apisix-ingress-cm.yaml 
b/samples/deploy/configmap/apisix-ingress-cm.yaml
index 5c98ff8f..edcc32b0 100644
--- a/samples/deploy/configmap/apisix-ingress-cm.yaml
+++ b/samples/deploy/configmap/apisix-ingress-cm.yaml
@@ -35,7 +35,7 @@ data:
    http_listen: ":8080"   # the HTTP Server listen address, default is ":8080"
    enable_profiling: true # enable profiling via web interfaces
                           # host:port/debug/pprof, default is true.
-
+   apisix-resource-sync-interval: 300s # Default interval for synchronizing 
Kubernetes resources to APISIX
    # Kubernetes related configurations.
    kubernetes:
      kubeconfig: ""         # the Kubernetes configuration file path, default 
is
diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go
index cef850df..13cd350b 100644
--- a/test/e2e/scaffold/ingress.go
+++ b/test/e2e/scaffold/ingress.go
@@ -277,6 +277,8 @@ spec:
             - debug
             - --log-output
             - stdout
+            - --apisix-resource-sync-interval
+            - %s
             - --http-listen
             - :8080
             - --https-listen
@@ -417,10 +419,10 @@ func (s *Scaffold) newIngressAPISIXController() error {
        var ingressAPISIXDeployment string
        label := fmt.Sprintf("apisix.ingress.watch=%s", s.namespace)
        if s.opts.EnableWebhooks {
-               ingressAPISIXDeployment = 
fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), 
s.opts.IngressAPISIXReplicas, s.namespace,
+               ingressAPISIXDeployment = 
fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), 
s.opts.IngressAPISIXReplicas, s.namespace, s.opts.ApisixResourceSyncInterval,
                        s.FormatNamespaceLabel(label), 
s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress, 
s.opts.EnableGatewayAPI, _volumeMounts, _webhookCertSecret)
        } else {
-               ingressAPISIXDeployment = 
fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), 
s.opts.IngressAPISIXReplicas, s.namespace,
+               ingressAPISIXDeployment = 
fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), 
s.opts.IngressAPISIXReplicas, s.namespace, s.opts.ApisixResourceSyncInterval,
                        s.FormatNamespaceLabel(label), 
s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress, 
s.opts.EnableGatewayAPI, "", _webhookCertSecret)
        }
 
@@ -527,9 +529,10 @@ func (s *Scaffold) ScaleIngressController(desired int) 
error {
        var ingressDeployment string
        label := fmt.Sprintf("apisix.ingress.watch=%s", s.namespace)
        if s.opts.EnableWebhooks {
-               ingressDeployment = 
fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), desired, 
s.namespace, label, s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress, 
s.opts.EnableGatewayAPI, _volumeMounts, _webhookCertSecret)
+
+               ingressDeployment = 
fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), desired, 
s.namespace, s.opts.ApisixResourceSyncInterval, label, 
s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress, 
s.opts.EnableGatewayAPI, _volumeMounts, _webhookCertSecret)
        } else {
-               ingressDeployment = 
fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), desired, 
s.namespace, label, s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress, 
s.opts.EnableGatewayAPI, "", _webhookCertSecret)
+               ingressDeployment = 
fmt.Sprintf(s.FormatRegistry(_ingressAPISIXDeploymentTemplate), desired, 
s.namespace, s.opts.ApisixResourceSyncInterval, label, 
s.opts.APISIXRouteVersion, s.opts.APISIXPublishAddress, 
s.opts.EnableGatewayAPI, "", _webhookCertSecret)
        }
        if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, 
ingressDeployment); err != nil {
                return err
diff --git a/test/e2e/scaffold/k8s.go b/test/e2e/scaffold/k8s.go
index b788c92f..e9c6cbb1 100644
--- a/test/e2e/scaffold/k8s.go
+++ b/test/e2e/scaffold/k8s.go
@@ -252,7 +252,7 @@ func (s *Scaffold) 
EnsureNumApisixPluginConfigCreated(desired int) error {
        return s.ensureNumApisixCRDsCreated(u.String(), desired)
 }
 
-// CreateApisixRouteByApisixAdmin create a route
+// CreateApisixRouteByApisixAdmin create or update a route
 func (s *Scaffold) CreateApisixRouteByApisixAdmin(routeID string, body []byte) 
error {
        u := url.URL{
                Scheme: "http",
@@ -262,6 +262,16 @@ func (s *Scaffold) CreateApisixRouteByApisixAdmin(routeID 
string, body []byte) e
        return s.ensureAdminOperationIsSuccessful(u.String(), "PUT", body)
 }
 
+// CreateApisixRouteByApisixAdmin create or update a consumer
+func (s *Scaffold) CreateApisixConsumerByApisixAdmin(body []byte) error {
+       u := url.URL{
+               Scheme: "http",
+               Host:   s.apisixAdminTunnel.Endpoint(),
+               Path:   "/apisix/admin/consumers",
+       }
+       return s.ensureAdminOperationIsSuccessful(u.String(), "PUT", body)
+}
+
 // DeleteApisixRouteByApisixAdmin deletes a route by its route name in APISIX 
cluster.
 func (s *Scaffold) DeleteApisixRouteByApisixAdmin(routeID string) error {
        u := url.URL{
@@ -272,6 +282,16 @@ func (s *Scaffold) DeleteApisixRouteByApisixAdmin(routeID 
string) error {
        return s.ensureAdminOperationIsSuccessful(u.String(), "DELETE", nil)
 }
 
+// DeleteApisixConsumerByApisixAdmin deletes a consumer by its consumer name 
in APISIX cluster.
+func (s *Scaffold) DeleteApisixConsumerByApisixAdmin(consumerName string) 
error {
+       u := url.URL{
+               Scheme: "http",
+               Host:   s.apisixAdminTunnel.Endpoint(),
+               Path:   "/apisix/admin/consumers/" + consumerName,
+       }
+       return s.ensureAdminOperationIsSuccessful(u.String(), "DELETE", nil)
+}
+
 func (s *Scaffold) ensureAdminOperationIsSuccessful(url, method string, body 
[]byte) error {
        condFunc := func() (bool, error) {
                req, err := http.NewRequest(method, url, bytes.NewBuffer(body))
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index 01d4f50c..ee29fcc5 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -61,6 +61,7 @@ type Options struct {
        EnableWebhooks             bool
        APISIXPublishAddress       string
        disableNamespaceSelector   bool
+       ApisixResourceSyncInterval string
        EnableGatewayAPI           bool
 }
 
@@ -127,6 +128,9 @@ func NewScaffold(o *Options) *Scaffold {
        if o.APISIXAdminAPIKey == "" {
                o.APISIXAdminAPIKey = "edd1c9f034335f136f87ad84b625c8f1"
        }
+       if o.ApisixResourceSyncInterval == "" {
+               o.ApisixResourceSyncInterval = "300s"
+       }
        defer ginkgo.GinkgoRecover()
 
        s := &Scaffold{
diff --git a/test/e2e/suite-ingress/resourcesync.go 
b/test/e2e/suite-ingress/resourcesync.go
new file mode 100644
index 00000000..559c36fe
--- /dev/null
+++ b/test/e2e/suite-ingress/resourcesync.go
@@ -0,0 +1,227 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package ingress
+
+import (
+       "encoding/json"
+       "fmt"
+       "net/http"
+       "time"
+
+       ginkgo "github.com/onsi/ginkgo/v2"
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/apisix-ingress-controller/pkg/id"
+       "github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+)
+
+var _ = ginkgo.Describe("suite-ingress: apisix resource sync", func() {
+       opts := &scaffold.Options{
+               Name:                       "default",
+               Kubeconfig:                 scaffold.GetKubeconfig(),
+               APISIXConfigPath:           "testdata/apisix-gw-config.yaml",
+               IngressAPISIXReplicas:      1,
+               HTTPBinServicePort:         80,
+               APISIXRouteVersion:         "apisix.apache.org/v2beta3",
+               ApisixResourceSyncInterval: "60s",
+       }
+       s := scaffold.NewScaffold(opts)
+       ginkgo.JustBeforeEach(func() {
+               backendSvc, backendPorts := s.DefaultHTTPBackend()
+               // Create ApisixRoute resource
+               ar := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2beta3
+kind: ApisixRoute
+metadata:
+ name: httpbin-route
+spec:
+ http:
+ - name: rule1
+   match:
+     hosts:
+     - httpbin.org
+     paths:
+       - /ip
+   backends:
+   - serviceName: %s
+     servicePort: %d
+   authentication:
+     enable: true
+     type: keyAuth
+`, backendSvc, backendPorts[0])
+               assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(ar))
+               err := s.EnsureNumApisixUpstreamsCreated(1)
+               assert.Nil(ginkgo.GinkgoT(), err, "Checking number of 
upstreams")
+               err = s.EnsureNumApisixRoutesCreated(1)
+               assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
+
+               // Create Ingress resource
+               ing := fmt.Sprintf(`
+apiVersion: networking.k8s.io/v1
+kind: Ingress
+metadata:
+  annotations:
+    kubernetes.io/ingress.class: apisix
+  name: ingress-route
+spec:
+  rules:
+  - host: local.httpbin.org
+    http:
+      paths:
+      - path: /headers
+        pathType: Exact
+        backend:
+          service:
+            name: %s
+            port:
+              number: %d
+`, backendSvc, backendPorts[0])
+               assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(ing))
+
+               // Create ApisixConsumer resource
+               err = s.ApisixConsumerKeyAuthCreated("foo", "foo-key")
+               assert.Nil(ginkgo.GinkgoT(), err)
+       })
+
+       ginkgo.It("for modified resource sync consistency", func() {
+               // crd resource sync interval
+               readyTime := time.Now().Add(60 * time.Second)
+
+               routes, _ := s.ListApisixRoutes()
+               assert.Len(ginkgo.GinkgoT(), routes, 2)
+
+               consumers, _ := s.ListApisixConsumers()
+               assert.Len(ginkgo.GinkgoT(), consumers, 1)
+
+               for _, route := range routes {
+                       _ = 
s.CreateApisixRouteByApisixAdmin(id.GenID(route.Name), []byte(`
+{
+       "methods": ["GET"],
+       "uri": "/anything",
+       "plugins": {
+               "key-auth": {}
+       },
+       "upstream": {
+               "type": "roundrobin",
+               "nodes": {
+                       "httpbin.org": 1
+               }
+       }
+}`))
+               }
+
+               for _, consumer := range consumers {
+                       _ = 
s.CreateApisixConsumerByApisixAdmin([]byte(fmt.Sprintf(`
+{
+       "username": "%s",
+       "plugins": {
+               "key-auth": {
+                       "key": "auth-one"
+               }
+       }
+}`, consumer.Username)))
+               }
+
+               _ = s.NewAPISIXClient().
+                       GET("/ip").
+                       WithHeader("Host", "httpbin.org").
+                       Expect().
+                       Status(http.StatusNotFound)
+
+               _ = s.NewAPISIXClient().
+                       GET("/headers").
+                       WithHeader("Host", "local.httpbin.org").
+                       Expect().
+                       Status(http.StatusNotFound)
+
+               waitTime := time.Until(readyTime).Seconds()
+               time.Sleep(time.Duration(waitTime) * time.Second)
+
+               _ = s.NewAPISIXClient().
+                       GET("/ip").
+                       WithHeader("Host", "httpbin.org").
+                       WithHeader("apikey", "foo-key").
+                       Expect().
+                       Status(http.StatusOK)
+
+               _ = s.NewAPISIXClient().
+                       GET("/headers").
+                       WithHeader("Host", "local.httpbin.org").
+                       Expect().
+                       Status(http.StatusOK)
+
+               consumers, _ = s.ListApisixConsumers()
+               assert.Len(ginkgo.GinkgoT(), consumers, 1)
+               data, _ := json.Marshal(consumers[0])
+               assert.Contains(ginkgo.GinkgoT(), string(data), "foo-key")
+       })
+
+       ginkgo.It("for deleted resource sync consistency", func() {
+               // crd resource sync interval
+               readyTime := time.Now().Add(60 * time.Second)
+
+               routes, _ := s.ListApisixRoutes()
+               assert.Len(ginkgo.GinkgoT(), routes, 2)
+
+               consumers, _ := s.ListApisixConsumers()
+               assert.Len(ginkgo.GinkgoT(), consumers, 1)
+
+               for _, route := range routes {
+                       _ = 
s.DeleteApisixRouteByApisixAdmin(id.GenID(route.Name))
+               }
+
+               for _, consumer := range consumers {
+                       s.DeleteApisixConsumerByApisixAdmin(consumer.Username)
+               }
+
+               _ = s.NewAPISIXClient().
+                       GET("/ip").
+                       WithHeader("Host", "httpbin.org").
+                       Expect().
+                       Status(http.StatusNotFound)
+
+               _ = s.NewAPISIXClient().
+                       GET("/headers").
+                       WithHeader("Host", "local.httpbin.org").
+                       Expect().
+                       Status(http.StatusNotFound)
+
+               routes, _ = s.ListApisixRoutes()
+               assert.Len(ginkgo.GinkgoT(), routes, 0)
+               consumers, _ = s.ListApisixConsumers()
+               assert.Len(ginkgo.GinkgoT(), consumers, 0)
+
+               waitTime := time.Until(readyTime).Seconds()
+               time.Sleep(time.Duration(waitTime) * time.Second)
+
+               _ = s.NewAPISIXClient().
+                       GET("/ip").
+                       WithHeader("Host", "httpbin.org").
+                       WithHeader("apikey", "foo-key").
+                       Expect().
+                       Status(http.StatusOK)
+
+               _ = s.NewAPISIXClient().
+                       GET("/headers").
+                       WithHeader("Host", "local.httpbin.org").
+                       Expect().
+                       Status(http.StatusOK)
+
+               consumers, _ = s.ListApisixConsumers()
+               assert.Len(ginkgo.GinkgoT(), consumers, 1)
+               data, _ := json.Marshal(consumers[0])
+               assert.Contains(ginkgo.GinkgoT(), string(data), "foo-key")
+       })
+})

Reply via email to