This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 819b003  fix: delete the cluster object when give up the leadership 
(#774)
819b003 is described below

commit 819b00318e8cd9b6639913301fb89d2acb168926
Author: Alex Zhang <[email protected]>
AuthorDate: Fri Dec 24 17:54:16 2021 +0800

    fix: delete the cluster object when give up the leadership (#774)
---
 pkg/apisix/apisix.go          | 11 ++++++
 pkg/apisix/cluster.go         | 12 ++++--
 pkg/ingress/controller.go     |  9 +++++
 test/e2e/chaos/chaos.go       | 85 +++++++++++++++++++++++++++++++++++++++++++
 test/e2e/e2e.go               |  1 +
 test/e2e/scaffold/k8s.go      |  9 +++++
 test/e2e/scaffold/scaffold.go | 37 ++++++++++++++++++-
 7 files changed, 160 insertions(+), 4 deletions(-)

diff --git a/pkg/apisix/apisix.go b/pkg/apisix/apisix.go
index 617af3c..838b286 100644
--- a/pkg/apisix/apisix.go
+++ b/pkg/apisix/apisix.go
@@ -32,6 +32,8 @@ type APISIX interface {
        UpdateCluster(context.Context, *ClusterOptions) error
        // ListClusters lists all APISIX clusters.
        ListClusters() []Cluster
+       // DeleteCluster deletes the target APISIX cluster by its name.
+       DeleteCluster(string)
 }
 
 // Cluster defines specific operations that can be applied in an APISIX
@@ -214,3 +216,12 @@ func (c *apisix) UpdateCluster(ctx context.Context, co 
*ClusterOptions) error {
        c.clusters[co.Name] = cluster
        return nil
 }
+
+func (c *apisix) DeleteCluster(name string) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+
+       // Don't have to close or free some resources in that cluster, so
+       // just delete its index.
+       delete(c.clusters, name)
+}
diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
index 194a906..0106dec 100644
--- a/pkg/apisix/cluster.go
+++ b/pkg/apisix/cluster.go
@@ -179,10 +179,16 @@ func (c *cluster) syncCache(ctx context.Context) {
                Steps:    5,
        }
        var lastSyncErr error
-       err := wait.ExponentialBackoff(backoff, func() (done bool, _ error) {
+       err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
                // impossibly return: false, nil
                // so can safe used
                done, lastSyncErr = c.syncCacheOnce(ctx)
+               select {
+               case <-ctx.Done():
+                       err = context.Canceled
+               default:
+                       break
+               }
                return
        })
        if err != nil {
@@ -199,7 +205,7 @@ func (c *cluster) syncCache(ctx context.Context) {
 func (c *cluster) syncCacheOnce(ctx context.Context) (bool, error) {
        routes, err := c.route.List(ctx)
        if err != nil {
-               log.Errorf("failed to list route in APISIX: %s", err)
+               log.Errorf("failed to list routes in APISIX: %s", err)
                return false, err
        }
        upstreams, err := c.upstream.List(ctx)
@@ -329,7 +335,7 @@ func (c *cluster) syncSchema(ctx context.Context, interval 
time.Duration) {
 
        for {
                if err := c.syncSchemaOnce(ctx); err != nil {
-                       log.Warnf("failed to sync schema: %s", err)
+                       log.Errorf("failed to sync schema: %s", err)
                        c.metricsCollector.IncrSyncOperation("schema", 
"failure")
                }
 
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 20e0829..b7686ee 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -353,6 +353,11 @@ func (c *Controller) Run(stop chan struct{}) error {
                                                zap.String("namespace", 
c.namespace),
                                                zap.String("pod", c.name),
                                        )
+                                       c.MetricsCollector.ResetLeader(false)
+                                       // delete the old APISIX cluster, so 
that the cached state
+                                       // like synchronization won't be used 
next time the candidate
+                                       // becomes the leader again.
+                                       
c.apisix.DeleteCluster(c.cfg.APISIX.DefaultClusterName)
                                }
                        },
                        OnStoppedLeading: func() {
@@ -361,6 +366,10 @@ func (c *Controller) Run(stop chan struct{}) error {
                                        zap.String("pod", c.name),
                                )
                                c.MetricsCollector.ResetLeader(false)
+                               // delete the old APISIX cluster, so that the 
cached state
+                               // like synchronization won't be used next time 
the candidate
+                               // becomes the leader again.
+                               
c.apisix.DeleteCluster(c.cfg.APISIX.DefaultClusterName)
                        },
                },
                // Set it to false as current leaderelection implementation 
will report
diff --git a/test/e2e/chaos/chaos.go b/test/e2e/chaos/chaos.go
new file mode 100644
index 0000000..4b152e4
--- /dev/null
+++ b/test/e2e/chaos/chaos.go
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package chaos
+
+import (
+       "fmt"
+       "net/http"
+
+       "github.com/onsi/ginkgo"
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+)
+
+var _ = ginkgo.Describe("Chaos Testing", func() {
+       opts := &scaffold.Options{
+               Name:                  "default",
+               Kubeconfig:            scaffold.GetKubeconfig(),
+               APISIXConfigPath:      "testdata/apisix-gw-config.yaml",
+               IngressAPISIXReplicas: 1,
+               HTTPBinServicePort:    80,
+               APISIXRouteVersion:    "apisix.apache.org/v2beta2",
+       }
+       s := scaffold.NewScaffold(opts)
+       ginkgo.Context("simulate apisix deployment restart", func() {
+               ginkgo.Specify("ingress controller can synchronize rules 
normally after apisix recovery", func() {
+                       assert.Nil(ginkgo.GinkgoT(), 
s.EnsureNumApisixRoutesCreated(0), "checking number of upstreams")
+                       backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+                       route1 := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2beta2
+kind: ApisixRoute
+metadata:
+  name: httpbin-route1
+spec:
+  http:
+  - name: route1
+    match:
+      hosts:
+      - httpbin.org
+      paths:
+      - /ip
+    backends:
+    - serviceName: %s
+      servicePort: %d
+`, backendSvc, backendSvcPort[0])
+                       assert.Nil(ginkgo.GinkgoT(), 
s.CreateResourceFromString(route1))
+                       assert.Nil(ginkgo.GinkgoT(), 
s.EnsureNumApisixRoutesCreated(1), "checking number of routes")
+                       s.RestartAPISIXDeploy()
+                       route2 := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2beta2
+kind: ApisixRoute
+metadata:
+  name: httpbin-route2
+spec:
+  http:
+  - name: route2
+    match:
+      hosts:
+      - httpbin.org
+      paths:
+      - /get
+    backends:
+    - serviceName: %s
+      servicePort: %d
+`, backendSvc, backendSvcPort[0])
+                       assert.Nil(ginkgo.GinkgoT(), 
s.CreateResourceFromString(route2))
+                       assert.Nil(ginkgo.GinkgoT(), 
s.EnsureNumApisixRoutesCreated(2), "checking number of routes")
+                       s.NewAPISIXClient().GET("/ip").WithHeader("Host", 
"httpbin.org").Expect().Status(http.StatusOK)
+                       s.NewAPISIXClient().GET("/get").WithHeader("Host", 
"httpbin.org").Expect().Status(http.StatusOK)
+               })
+       })
+
+})
diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go
index 07cf074..651a1c6 100644
--- a/test/e2e/e2e.go
+++ b/test/e2e/e2e.go
@@ -16,6 +16,7 @@ package e2e
 
 import (
        _ "github.com/apache/apisix-ingress-controller/test/e2e/annotations"
+       _ "github.com/apache/apisix-ingress-controller/test/e2e/chaos"
        _ "github.com/apache/apisix-ingress-controller/test/e2e/config"
        _ "github.com/apache/apisix-ingress-controller/test/e2e/endpoints"
        _ "github.com/apache/apisix-ingress-controller/test/e2e/features"
diff --git a/test/e2e/scaffold/k8s.go b/test/e2e/scaffold/k8s.go
index 842c343..8362144 100644
--- a/test/e2e/scaffold/k8s.go
+++ b/test/e2e/scaffold/k8s.go
@@ -474,6 +474,15 @@ func (s *Scaffold) newAPISIXTunnels() error {
        return nil
 }
 
+func (s *Scaffold) shutdownApisixTunnel() {
+       s.apisixAdminTunnel.Close()
+       s.apisixHttpTunnel.Close()
+       s.apisixHttpsTunnel.Close()
+       s.apisixTCPTunnel.Close()
+       s.apisixUDPTunnel.Close()
+       s.apisixControlTunnel.Close()
+}
+
 // Namespace returns the current working namespace.
 func (s *Scaffold) Namespace() string {
        return s.kubectlOptions.Namespace
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index f927f96..7d5fa08 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -289,6 +289,23 @@ func (s *Scaffold) APISIXGatewayServiceEndpoint() string {
        return s.apisixHttpTunnel.Endpoint()
 }
 
+// RestartAPISIXDeploy delete apisix pod and wait new pod be ready
+func (s *Scaffold) RestartAPISIXDeploy() {
+       s.shutdownApisixTunnel()
+       pods, err := k8s.ListPodsE(s.t, s.kubectlOptions, metav1.ListOptions{
+               LabelSelector: "app=apisix-deployment-e2e-test",
+       })
+       assert.NoError(s.t, err, "list apisix pod")
+       for _, pod := range pods {
+               err = s.KillPod(pod.Name)
+               assert.NoError(s.t, err, "killing apisix pod")
+       }
+       err = s.waitAllAPISIXPodsAvailable()
+       assert.NoError(s.t, err, "waiting for new apisix instance ready")
+       err = s.newAPISIXTunnels()
+       assert.NoError(s.t, err, "renew apisix tunnels")
+}
+
 func (s *Scaffold) beforeEach() {
        var err error
        s.namespace = fmt.Sprintf("ingress-apisix-e2e-tests-%s-%d", 
s.opts.Name, time.Now().Nanosecond())
@@ -371,7 +388,7 @@ func (s *Scaffold) afterEach() {
        assert.Nilf(ginkgo.GinkgoT(), err, "deleting namespace %s", s.namespace)
 
        for _, f := range s.finializers {
-               f()
+               runWithRecover(f)
        }
 
        // Wait for a while to prevent the worker node being overwhelming
@@ -379,6 +396,24 @@ func (s *Scaffold) afterEach() {
        time.Sleep(3 * time.Second)
 }
 
+func runWithRecover(f func()) {
+       defer func() {
+               r := recover()
+               if r == nil {
+                       return
+               }
+               err, ok := r.(error)
+               if ok {
+                       // just ignore already closed channel
+                       if strings.Contains(err.Error(), "close of closed 
channel") {
+                               return
+                       }
+               }
+               panic(r)
+       }()
+       f()
+}
+
 func (s *Scaffold) GetDeploymentLogs(name string) string {
        cli, err := k8s.GetKubernetesClientE(s.t)
        if err != nil {

Reply via email to