This is an automated email from the ASF dual-hosted git repository.
kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 819b003 fix: delete the cluster object when give up the leadership
(#774)
819b003 is described below
commit 819b00318e8cd9b6639913301fb89d2acb168926
Author: Alex Zhang <[email protected]>
AuthorDate: Fri Dec 24 17:54:16 2021 +0800
fix: delete the cluster object when give up the leadership (#774)
---
pkg/apisix/apisix.go | 11 ++++++
pkg/apisix/cluster.go | 12 ++++--
pkg/ingress/controller.go | 9 +++++
test/e2e/chaos/chaos.go | 85 +++++++++++++++++++++++++++++++++++++++++++
test/e2e/e2e.go | 1 +
test/e2e/scaffold/k8s.go | 9 +++++
test/e2e/scaffold/scaffold.go | 37 ++++++++++++++++++-
7 files changed, 160 insertions(+), 4 deletions(-)
diff --git a/pkg/apisix/apisix.go b/pkg/apisix/apisix.go
index 617af3c..838b286 100644
--- a/pkg/apisix/apisix.go
+++ b/pkg/apisix/apisix.go
@@ -32,6 +32,8 @@ type APISIX interface {
UpdateCluster(context.Context, *ClusterOptions) error
// ListClusters lists all APISIX clusters.
ListClusters() []Cluster
+ // DeleteCluster deletes the target APISIX cluster by its name.
+ DeleteCluster(string)
}
// Cluster defines specific operations that can be applied in an APISIX
@@ -214,3 +216,12 @@ func (c *apisix) UpdateCluster(ctx context.Context, co
*ClusterOptions) error {
c.clusters[co.Name] = cluster
return nil
}
+
+func (c *apisix) DeleteCluster(name string) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ // Don't have to close or free some resources in that cluster, so
+ // just delete its index.
+ delete(c.clusters, name)
+}
diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
index 194a906..0106dec 100644
--- a/pkg/apisix/cluster.go
+++ b/pkg/apisix/cluster.go
@@ -179,10 +179,16 @@ func (c *cluster) syncCache(ctx context.Context) {
Steps: 5,
}
var lastSyncErr error
- err := wait.ExponentialBackoff(backoff, func() (done bool, _ error) {
+ err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
// impossibly return: false, nil
// so can safe used
done, lastSyncErr = c.syncCacheOnce(ctx)
+ select {
+ case <-ctx.Done():
+ err = context.Canceled
+ default:
+ break
+ }
return
})
if err != nil {
@@ -199,7 +205,7 @@ func (c *cluster) syncCache(ctx context.Context) {
func (c *cluster) syncCacheOnce(ctx context.Context) (bool, error) {
routes, err := c.route.List(ctx)
if err != nil {
- log.Errorf("failed to list route in APISIX: %s", err)
+ log.Errorf("failed to list routes in APISIX: %s", err)
return false, err
}
upstreams, err := c.upstream.List(ctx)
@@ -329,7 +335,7 @@ func (c *cluster) syncSchema(ctx context.Context, interval
time.Duration) {
for {
if err := c.syncSchemaOnce(ctx); err != nil {
- log.Warnf("failed to sync schema: %s", err)
+ log.Errorf("failed to sync schema: %s", err)
c.metricsCollector.IncrSyncOperation("schema",
"failure")
}
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 20e0829..b7686ee 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -353,6 +353,11 @@ func (c *Controller) Run(stop chan struct{}) error {
zap.String("namespace",
c.namespace),
zap.String("pod", c.name),
)
+ c.MetricsCollector.ResetLeader(false)
+ // delete the old APISIX cluster, so
that the cached state
+ // like synchronization won't be used
next time the candidate
+ // becomes the leader again.
+
c.apisix.DeleteCluster(c.cfg.APISIX.DefaultClusterName)
}
},
OnStoppedLeading: func() {
@@ -361,6 +366,10 @@ func (c *Controller) Run(stop chan struct{}) error {
zap.String("pod", c.name),
)
c.MetricsCollector.ResetLeader(false)
+ // delete the old APISIX cluster, so that the
cached state
+ // like synchronization won't be used next time
the candidate
+ // becomes the leader again.
+
c.apisix.DeleteCluster(c.cfg.APISIX.DefaultClusterName)
},
},
// Set it to false as current leaderelection implementation
will report
diff --git a/test/e2e/chaos/chaos.go b/test/e2e/chaos/chaos.go
new file mode 100644
index 0000000..4b152e4
--- /dev/null
+++ b/test/e2e/chaos/chaos.go
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package chaos
+
+import (
+ "fmt"
+ "net/http"
+
+ "github.com/onsi/ginkgo"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/apisix-ingress-controller/test/e2e/scaffold"
+)
+
+var _ = ginkgo.Describe("Chaos Testing", func() {
+ opts := &scaffold.Options{
+ Name: "default",
+ Kubeconfig: scaffold.GetKubeconfig(),
+ APISIXConfigPath: "testdata/apisix-gw-config.yaml",
+ IngressAPISIXReplicas: 1,
+ HTTPBinServicePort: 80,
+ APISIXRouteVersion: "apisix.apache.org/v2beta2",
+ }
+ s := scaffold.NewScaffold(opts)
+ ginkgo.Context("simulate apisix deployment restart", func() {
+ ginkgo.Specify("ingress controller can synchronize rules
normally after apisix recovery", func() {
+ assert.Nil(ginkgo.GinkgoT(),
s.EnsureNumApisixRoutesCreated(0), "checking number of upstreams")
+ backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+ route1 := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2beta2
+kind: ApisixRoute
+metadata:
+ name: httpbin-route1
+spec:
+ http:
+ - name: route1
+ match:
+ hosts:
+ - httpbin.org
+ paths:
+ - /ip
+ backends:
+ - serviceName: %s
+ servicePort: %d
+`, backendSvc, backendSvcPort[0])
+ assert.Nil(ginkgo.GinkgoT(),
s.CreateResourceFromString(route1))
+ assert.Nil(ginkgo.GinkgoT(),
s.EnsureNumApisixRoutesCreated(1), "checking number of routes")
+ s.RestartAPISIXDeploy()
+ route2 := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v2beta2
+kind: ApisixRoute
+metadata:
+ name: httpbin-route2
+spec:
+ http:
+ - name: route2
+ match:
+ hosts:
+ - httpbin.org
+ paths:
+ - /get
+ backends:
+ - serviceName: %s
+ servicePort: %d
+`, backendSvc, backendSvcPort[0])
+ assert.Nil(ginkgo.GinkgoT(),
s.CreateResourceFromString(route2))
+ assert.Nil(ginkgo.GinkgoT(),
s.EnsureNumApisixRoutesCreated(2), "checking number of routes")
+ s.NewAPISIXClient().GET("/ip").WithHeader("Host",
"httpbin.org").Expect().Status(http.StatusOK)
+ s.NewAPISIXClient().GET("/get").WithHeader("Host",
"httpbin.org").Expect().Status(http.StatusOK)
+ })
+ })
+
+})
diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go
index 07cf074..651a1c6 100644
--- a/test/e2e/e2e.go
+++ b/test/e2e/e2e.go
@@ -16,6 +16,7 @@ package e2e
import (
_ "github.com/apache/apisix-ingress-controller/test/e2e/annotations"
+ _ "github.com/apache/apisix-ingress-controller/test/e2e/chaos"
_ "github.com/apache/apisix-ingress-controller/test/e2e/config"
_ "github.com/apache/apisix-ingress-controller/test/e2e/endpoints"
_ "github.com/apache/apisix-ingress-controller/test/e2e/features"
diff --git a/test/e2e/scaffold/k8s.go b/test/e2e/scaffold/k8s.go
index 842c343..8362144 100644
--- a/test/e2e/scaffold/k8s.go
+++ b/test/e2e/scaffold/k8s.go
@@ -474,6 +474,15 @@ func (s *Scaffold) newAPISIXTunnels() error {
return nil
}
+func (s *Scaffold) shutdownApisixTunnel() {
+ s.apisixAdminTunnel.Close()
+ s.apisixHttpTunnel.Close()
+ s.apisixHttpsTunnel.Close()
+ s.apisixTCPTunnel.Close()
+ s.apisixUDPTunnel.Close()
+ s.apisixControlTunnel.Close()
+}
+
// Namespace returns the current working namespace.
func (s *Scaffold) Namespace() string {
return s.kubectlOptions.Namespace
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index f927f96..7d5fa08 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -289,6 +289,23 @@ func (s *Scaffold) APISIXGatewayServiceEndpoint() string {
return s.apisixHttpTunnel.Endpoint()
}
+// RestartAPISIXDeploy delete apisix pod and wait new pod be ready
+func (s *Scaffold) RestartAPISIXDeploy() {
+ s.shutdownApisixTunnel()
+ pods, err := k8s.ListPodsE(s.t, s.kubectlOptions, metav1.ListOptions{
+ LabelSelector: "app=apisix-deployment-e2e-test",
+ })
+ assert.NoError(s.t, err, "list apisix pod")
+ for _, pod := range pods {
+ err = s.KillPod(pod.Name)
+ assert.NoError(s.t, err, "killing apisix pod")
+ }
+ err = s.waitAllAPISIXPodsAvailable()
+ assert.NoError(s.t, err, "waiting for new apisix instance ready")
+ err = s.newAPISIXTunnels()
+ assert.NoError(s.t, err, "renew apisix tunnels")
+}
+
func (s *Scaffold) beforeEach() {
var err error
s.namespace = fmt.Sprintf("ingress-apisix-e2e-tests-%s-%d",
s.opts.Name, time.Now().Nanosecond())
@@ -371,7 +388,7 @@ func (s *Scaffold) afterEach() {
assert.Nilf(ginkgo.GinkgoT(), err, "deleting namespace %s", s.namespace)
for _, f := range s.finializers {
- f()
+ runWithRecover(f)
}
// Wait for a while to prevent the worker node being overwhelming
@@ -379,6 +396,24 @@ func (s *Scaffold) afterEach() {
time.Sleep(3 * time.Second)
}
+func runWithRecover(f func()) {
+ defer func() {
+ r := recover()
+ if r == nil {
+ return
+ }
+ err, ok := r.(error)
+ if ok {
+ // just ignore already closed channel
+ if strings.Contains(err.Error(), "close of closed
channel") {
+ return
+ }
+ }
+ panic(r)
+ }()
+ f()
+}
+
func (s *Scaffold) GetDeploymentLogs(name string) string {
cli, err := k8s.GetKubernetesClientE(s.t)
if err != nil {