This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 957c315  feat: add full compare when ingress startup (#680)
957c315 is described below

commit 957c31522e1b1e5f8ef9cab7eb244473a4e0f675
Author: kv <[email protected]>
AuthorDate: Fri Sep 24 18:00:46 2021 +0800

    feat: add full compare when ingress startup (#680)
---
 pkg/ingress/compare.go        | 246 ++++++++++++++++++++++++++++++++++++++++++
 pkg/ingress/controller.go     |   7 ++
 pkg/ingress/pod.go            |   3 +-
 test/e2e/go.mod               |   1 -
 test/e2e/ingress/compare.go   |  74 +++++++++++++
 test/e2e/scaffold/ingress.go  |  18 ++++
 test/e2e/scaffold/scaffold.go |   6 +-
 7 files changed, 350 insertions(+), 5 deletions(-)

diff --git a/pkg/ingress/compare.go b/pkg/ingress/compare.go
new file mode 100644
index 0000000..badad42
--- /dev/null
+++ b/pkg/ingress/compare.go
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package ingress
+
+import (
+       "context"
+       "sync"
+
+       v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
+       "github.com/apache/apisix-ingress-controller/pkg/log"
+)
+
+// CompareResources used to compare the object IDs in resources and APISIX
+// Find out the rest of objects in APISIX
+// AND warn them in log.
+func (c *Controller) CompareResources(ctx context.Context) error {
+       var (
+               wg                sync.WaitGroup
+               routeMapK8S       = new(sync.Map)
+               streamRouteMapK8S = new(sync.Map)
+               upstreamMapK8S    = new(sync.Map)
+               sslMapK8S         = new(sync.Map)
+               consumerMapK8S    = new(sync.Map)
+
+               routeMapA6       = make(map[string]string)
+               streamRouteMapA6 = make(map[string]string)
+               upstreamMapA6    = make(map[string]string)
+               sslMapA6         = make(map[string]string)
+               consumerMapA6    = make(map[string]string)
+       )
+       // watchingNamespace == nil means to monitor all namespaces
+       if c.watchingNamespace == nil {
+               opts := v1.ListOptions{}
+               // list all namespaces
+               nsList, err := 
c.kubeClient.Client.CoreV1().Namespaces().List(ctx, opts)
+               if err != nil {
+                       log.Error(err.Error())
+                       ctx.Done()
+               } else {
+                       wns := make(map[string]struct{}, len(nsList.Items))
+                       for _, v := range nsList.Items {
+                               wns[v.Name] = struct{}{}
+                       }
+                       c.watchingNamespace = wns
+               }
+       }
+       if len(c.watchingNamespace) > 0 {
+               wg.Add(len(c.watchingNamespace))
+       }
+       for ns := range c.watchingNamespace {
+               go func(ns string) {
+                       defer wg.Done()
+                       // ApisixRoute
+                       opts := v1.ListOptions{}
+                       retRoutes, err := 
c.kubeClient.APISIXClient.ApisixV2beta1().ApisixRoutes(ns).List(ctx, opts)
+                       if err != nil {
+                               log.Error(err.Error())
+                               ctx.Done()
+                       } else {
+                               for _, r := range retRoutes.Items {
+                                       tc, err := 
c.translator.TranslateRouteV2beta1NotStrictly(&r)
+                                       if err != nil {
+                                               log.Error(err.Error())
+                                               ctx.Done()
+                                       } else {
+                                               // routes
+                                               for _, route := range tc.Routes 
{
+                                                       
routeMapK8S.Store(route.ID, route.ID)
+                                               }
+                                               // streamRoutes
+                                               for _, stRoute := range 
tc.StreamRoutes {
+                                                       
streamRouteMapK8S.Store(stRoute.ID, stRoute.ID)
+                                               }
+                                               // upstreams
+                                               for _, upstream := range 
tc.Upstreams {
+                                                       
upstreamMapK8S.Store(upstream.ID, upstream.ID)
+                                               }
+                                               // ssl
+                                               for _, ssl := range tc.SSL {
+                                                       sslMapK8S.Store(ssl.ID, 
ssl.ID)
+                                               }
+                                       }
+                               }
+                       }
+                       // todo ApisixUpstream
+                       // ApisixUpstream should be synced with ApisixRoute 
resource
+
+                       // ApisixSSL
+                       retSSL, err := 
c.kubeClient.APISIXClient.ApisixV1().ApisixTlses(ns).List(ctx, opts)
+                       if err != nil {
+                               log.Error(err.Error())
+                               ctx.Done()
+                       } else {
+                               for _, s := range retSSL.Items {
+                                       ssl, err := 
c.translator.TranslateSSL(&s)
+                                       if err != nil {
+                                               log.Error(err.Error())
+                                               ctx.Done()
+                                       } else {
+                                               sslMapK8S.Store(ssl.ID, ssl.ID)
+                                       }
+                               }
+                       }
+                       // ApisixConsumer
+                       retConsumer, err := 
c.kubeClient.APISIXClient.ApisixV2alpha1().ApisixConsumers(ns).List(ctx, opts)
+                       if err != nil {
+                               log.Error(err.Error())
+                               ctx.Done()
+                       } else {
+                               for _, con := range retConsumer.Items {
+                                       consumer, err := 
c.translator.TranslateApisixConsumer(&con)
+                                       if err != nil {
+                                               log.Error(err.Error())
+                                               ctx.Done()
+                                       } else {
+                                               
consumerMapK8S.Store(consumer.Username, consumer.Username)
+                                       }
+                               }
+                       }
+               }(ns)
+       }
+       wg.Wait()
+
+       // 2.get all cache routes
+       if err := c.listRouteCache(ctx, routeMapA6); err != nil {
+               return err
+       }
+       if err := c.listStreamRouteCache(ctx, streamRouteMapA6); err != nil {
+               return err
+       }
+       if err := c.listUpstreamCache(ctx, upstreamMapA6); err != nil {
+               return err
+       }
+       if err := c.listSSLCache(ctx, sslMapA6); err != nil {
+               return err
+       }
+       if err := c.listConsumerCache(ctx, consumerMapA6); err != nil {
+               return err
+       }
+       // 3.compare
+       routeReult := findRedundant(routeMapA6, routeMapK8S)
+       streamRouteReult := findRedundant(streamRouteMapA6, streamRouteMapK8S)
+       upstreamReult := findRedundant(upstreamMapA6, upstreamMapK8S)
+       sslReult := findRedundant(sslMapA6, sslMapK8S)
+       consuemrReult := findRedundant(consumerMapA6, consumerMapK8S)
+       // 4.warn
+       warnRedundantResources(routeReult, "route")
+       warnRedundantResources(streamRouteReult, "streamRoute")
+       warnRedundantResources(upstreamReult, "upstream")
+       warnRedundantResources(sslReult, "ssl")
+       warnRedundantResources(consuemrReult, "consumer")
+
+       return nil
+}
+
+// log warn
+func warnRedundantResources(resources map[string]string, t string) {
+       for k := range resources {
+               log.Warnf("%s: %s in APISIX but do not in declare yaml", t, k)
+       }
+}
+
+// findRedundant find redundant item which in src and do not in dest
+func findRedundant(src map[string]string, dest *sync.Map) map[string]string {
+       result := make(map[string]string)
+       for k, v := range src {
+               _, ok := dest.Load(k)
+               if !ok {
+                       result[k] = v
+               }
+       }
+       return result
+}
+
+func (c *Controller) listRouteCache(ctx context.Context, routeMapA6 
map[string]string) error {
+       routesInA6, err := 
c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Route().List(ctx)
+       if err != nil {
+               return err
+       } else {
+               for _, ra := range routesInA6 {
+                       routeMapA6[ra.ID] = ra.ID
+               }
+       }
+       return nil
+}
+
+func (c *Controller) listStreamRouteCache(ctx context.Context, 
streamRouteMapA6 map[string]string) error {
+       streamRoutesInA6, err := 
c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).StreamRoute().List(ctx)
+       if err != nil {
+               return err
+       } else {
+               for _, ra := range streamRoutesInA6 {
+                       streamRouteMapA6[ra.ID] = ra.ID
+               }
+       }
+       return nil
+}
+
+func (c *Controller) listUpstreamCache(ctx context.Context, upstreamMapA6 
map[string]string) error {
+       upstreamsInA6, err := 
c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Upstream().List(ctx)
+       if err != nil {
+               return err
+       } else {
+               for _, ra := range upstreamsInA6 {
+                       upstreamMapA6[ra.ID] = ra.ID
+               }
+       }
+       return nil
+}
+
+func (c *Controller) listSSLCache(ctx context.Context, sslMapA6 
map[string]string) error {
+       sslInA6, err := 
c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).SSL().List(ctx)
+       if err != nil {
+               return err
+       } else {
+               for _, s := range sslInA6 {
+                       sslMapA6[s.ID] = s.ID
+               }
+       }
+       return nil
+}
+
+func (c *Controller) listConsumerCache(ctx context.Context, consumerMapA6 
map[string]string) error {
+       consumerInA6, err := 
c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).Consumer().List(ctx)
+       if err != nil {
+               return err
+       } else {
+               for _, con := range consumerInA6 {
+                       consumerMapA6[con.Username] = con.Username
+               }
+       }
+       return nil
+}
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 0b83a8e..b3312d0 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -402,6 +402,12 @@ func (c *Controller) run(ctx context.Context) {
 
        c.initWhenStartLeading()
 
+       // compare resources of k8s with objects of APISIX
+       if err = c.CompareResources(ctx); err != nil {
+               ctx.Done()
+               return
+       }
+
        c.goAttach(func() {
                c.checkClusterHealth(ctx, cancelFunc)
        })
@@ -418,6 +424,7 @@ func (c *Controller) run(ctx context.Context) {
                c.ingressInformer.Run(ctx.Done())
        })
        c.goAttach(func() {
+
                c.apisixRouteInformer.Run(ctx.Done())
        })
        c.goAttach(func() {
diff --git a/pkg/ingress/pod.go b/pkg/ingress/pod.go
index a5b1cd2..efaf881 100644
--- a/pkg/ingress/pod.go
+++ b/pkg/ingress/pod.go
@@ -89,7 +89,8 @@ func (c *podController) onUpdate(_, cur interface{}) {
                return
        }
        log.Debugw("pod update event arrived",
-               zap.Any("final state", pod),
+               zap.Any("pod namespace", pod.Namespace),
+               zap.Any("pod name", pod.Name),
        )
        if pod.DeletionTimestamp != nil {
                if err := c.controller.podCache.Delete(pod); err != nil {
diff --git a/test/e2e/go.mod b/test/e2e/go.mod
index 7e9fd40..4c48e5e 100644
--- a/test/e2e/go.mod
+++ b/test/e2e/go.mod
@@ -8,7 +8,6 @@ require (
        github.com/gorilla/websocket v1.4.2
        github.com/gruntwork-io/terratest v0.32.8
        github.com/onsi/ginkgo v1.16.4
-       github.com/onsi/gomega v1.10.1
        github.com/stretchr/testify v1.7.0
        k8s.io/api v0.21.1
        k8s.io/apimachinery v0.21.1
diff --git a/test/e2e/ingress/compare.go b/test/e2e/ingress/compare.go
new file mode 100644
index 0000000..ce57855
--- /dev/null
+++ b/test/e2e/ingress/compare.go
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package ingress
+
+import (
+       "fmt"
+       "time"
+
+       "github.com/onsi/ginkgo"
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+)
+
+var _ = ginkgo.Describe("Testing compare resources", func() {
+       opts := &scaffold.Options{
+               Name:                  "default",
+               Kubeconfig:            scaffold.GetKubeconfig(),
+               APISIXConfigPath:      "testdata/apisix-gw-config.yaml",
+               IngressAPISIXReplicas: 1,
+               HTTPBinServicePort:    80,
+               APISIXRouteVersion:    "apisix.apache.org/v2beta1",
+       }
+       s := scaffold.NewScaffold(opts)
+       ginkgo.It("Compare and find out the redundant objects in APISIX, and 
remove them", func() {
+               backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+               apisixRoute := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2beta1
+kind: ApisixRoute
+metadata:
+  name: httpbin-route
+spec:
+  http:
+  - name: rule1
+    match:
+      hosts:
+      - httpbin.com
+      paths:
+      - /ip
+    backend:
+      serviceName: %s
+      servicePort: %d
+`, backendSvc, backendSvcPort[0])
+               assert.Nil(ginkgo.GinkgoT(), 
s.CreateResourceFromString(apisixRoute))
+
+               err := s.EnsureNumApisixRoutesCreated(1)
+               assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
+               err = s.EnsureNumApisixUpstreamsCreated(1)
+               assert.Nil(ginkgo.GinkgoT(), err, "Checking number of 
upstreams")
+               // scale Ingres Controller --replicas=0
+               assert.Nil(ginkgo.GinkgoT(), s.ScaleIngressController(0), 
"scaling ingress controller instances = 0")
+               // remove ApisixRoute resource
+               assert.Nil(ginkgo.GinkgoT(), 
s.RemoveResourceByString(apisixRoute))
+               // scale Ingres Controller --replicas=1
+               assert.Nil(ginkgo.GinkgoT(), s.ScaleIngressController(1), 
"scaling ingress controller instances = 1")
+               time.Sleep(15 * time.Second)
+               // should find the warn log
+               output := 
s.GetDeploymentLogs("ingress-apisix-controller-deployment-e2e-test")
+               fmt.Println(output)
+               assert.Contains(ginkgo.GinkgoT(), output, "in APISIX but do not 
in declare yaml")
+       })
+})
diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go
index 33e732a..714cf3d 100644
--- a/test/e2e/scaffold/ingress.go
+++ b/test/e2e/scaffold/ingress.go
@@ -19,6 +19,7 @@ import (
        "context"
        "encoding/base64"
        "fmt"
+       "time"
 
        "github.com/gruntwork-io/terratest/modules/k8s"
        "github.com/onsi/ginkgo"
@@ -452,3 +453,20 @@ func (s *Scaffold) GetIngressPodDetails() ([]v1.Pod, 
error) {
                LabelSelector: 
"app=ingress-apisix-controller-deployment-e2e-test",
        })
 }
+
+// ScaleIngressController scales the number of Ingress Controller pods to 
desired.
+func (s *Scaffold) ScaleIngressController(desired int) error {
+       var ingressDeployment string
+       if s.opts.EnableWebhooks {
+               ingressDeployment = 
fmt.Sprintf(_ingressAPISIXDeploymentTemplate, desired, s.namespace, 
s.namespace, s.opts.APISIXRouteVersion, _volumeMounts, _webhookCertSecret)
+       } else {
+               ingressDeployment = 
fmt.Sprintf(_ingressAPISIXDeploymentTemplate, desired, s.namespace, 
s.namespace, s.opts.APISIXRouteVersion, "", _webhookCertSecret)
+       }
+       if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, 
ingressDeployment); err != nil {
+               return err
+       }
+       if err := k8s.WaitUntilNumPodsCreatedE(s.t, s.kubectlOptions, 
s.labelSelector("app=ingress-apisix-controller-deployment-e2e-test"), desired, 
5, 5*time.Second); err != nil {
+               return err
+       }
+       return nil
+}
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index bde3e8f..865c072 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -339,12 +339,12 @@ func (s *Scaffold) afterEach() {
                        _, _ = fmt.Fprintln(ginkgo.GinkgoWriter, output)
                }
                // Get the logs of apisix
-               output = s.getDeploymentLogs("apisix-deployment-e2e-test")
+               output = s.GetDeploymentLogs("apisix-deployment-e2e-test")
                if output != "" {
                        _, _ = fmt.Fprintln(ginkgo.GinkgoWriter, output)
                }
                // Get the logs of ingress
-               output = 
s.getDeploymentLogs("ingress-apisix-controller-deployment-e2e-test")
+               output = 
s.GetDeploymentLogs("ingress-apisix-controller-deployment-e2e-test")
                if output != "" {
                        _, _ = fmt.Fprintln(ginkgo.GinkgoWriter, output)
                }
@@ -362,7 +362,7 @@ func (s *Scaffold) afterEach() {
        time.Sleep(3 * time.Second)
 }
 
-func (s *Scaffold) getDeploymentLogs(name string) string {
+func (s *Scaffold) GetDeploymentLogs(name string) string {
        cli, err := k8s.GetKubernetesClientE(s.t)
        if err != nil {
                assert.Nilf(ginkgo.GinkgoT(), err, "get client error: %s", 
err.Error())

Reply via email to