This is an automated email from the ASF dual-hosted git repository.
kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 396cae2 feat: controller leader election (#173)
396cae2 is described below
commit 396cae2a5d4502a41f8683c472a1e533d82abb24
Author: Alex Zhang <[email protected]>
AuthorDate: Thu Jan 14 14:32:15 2021 +0800
feat: controller leader election (#173)
* feat: controller leader election
* test: add e2e cases
* chore: optimize e2e cases
---
charts/ingress-apisix/templates/deployment.yaml | 9 ++
cmd/ingress/ingress.go | 1 +
conf/config-default.yaml | 17 ++--
pkg/config/config.go | 5 +
pkg/config/config_test.go | 2 +
pkg/ingress/controller/controller.go | 119 ++++++++++++++++++----
samples/deploy/deployment/ingress-controller.yaml | 9 ++
test/e2e/ingress/namespace.go | 5 +
test/e2e/ingress/resourcepushing.go | 14 ++-
test/e2e/ingress/sanity.go | 64 ++++++++++++
test/e2e/scaffold/httpbin.go | 32 ++++++
test/e2e/scaffold/ingress.go | 49 ++++++++-
test/e2e/scaffold/scaffold.go | 29 ++++--
13 files changed, 315 insertions(+), 40 deletions(-)
diff --git a/charts/ingress-apisix/templates/deployment.yaml
b/charts/ingress-apisix/templates/deployment.yaml
index b96e811..b103ebd 100644
--- a/charts/ingress-apisix/templates/deployment.yaml
+++ b/charts/ingress-apisix/templates/deployment.yaml
@@ -70,6 +70,15 @@ spec:
volumeMounts:
- mountPath: /ingress-apisix/conf
name: configuration
+ env:
+ - name: POD_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
{{- with .Values.ingressController.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go
index 66a7748..dcaf894 100644
--- a/cmd/ingress/ingress.go
+++ b/cmd/ingress/ingress.go
@@ -126,6 +126,7 @@ the apisix cluster and others are created`,
cmd.PersistentFlags().StringVar(&cfg.Kubernetes.Kubeconfig,
"kubeconfig", "", "Kubernetes configuration file (by default in-cluster
configuration will be used)")
cmd.PersistentFlags().DurationVar(&cfg.Kubernetes.ResyncInterval.Duration,
"resync-interval", time.Minute, "the controller resync (with Kubernetes)
interval, the minimum resync interval is 30s")
cmd.PersistentFlags().StringSliceVar(&cfg.Kubernetes.AppNamespaces,
"app-namespace", []string{config.NamespaceAll}, "namespaces that controller
will watch for resources")
+ cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ElectionID,
"election-id", config.IngressAPISIXLeader, "election id used for compaign the
controller leader")
cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url",
"", "the base URL for APISIX admin api / manager api")
cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey,
"apisix-admin-key", "", "admin key used for the authorization of APISIX admin
api / manager api")
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 5e34911..419338a 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -33,13 +33,16 @@ enable_profiling: true # enable profileing via web
interfaces
# Kubernetes related configurations.
kubernetes:
- kubeconfig: "" # the Kubernetes configuration file path, default is
- # "", so the in-cluster configuration will be used.
- resync_interval: "6h" # how long should apisix-ingress-controller
- # re-synchronizes with Kubernetes, default is 6h,
- # and the minimal resync interval is 30s.
- app_namespaces: ["*"] # namespace list that controller will watch for
resources,
- # by default all namespaces (represented by "*") are
watched.
+ kubeconfig: "" # the Kubernetes configuration file
path, default is
+ # "", so the in-cluster configuration
will be used.
+ resync_interval: "6h" # how long should
apisix-ingress-controller
+ # re-synchronizes with Kubernetes,
default is 6h,
+ # and the minimal resync interval is
30s.
+ app_namespaces: ["*"] # namespace list that controller will
watch for resources,
+ # by default all namespaces
(represented by "*") are watched.
+ election_id: "ingress-apisix-leader" # the election id for the controller
leader compaign,
+ # only the leader will watch and
delivery resource changes,
+ # other instances (as candidates) stand
by.
# APISIX related configurations.
apisix:
diff --git a/pkg/config/config.go b/pkg/config/config.go
index 4a00383..7627989 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -30,6 +30,9 @@ import (
const (
// NamespaceAll represents all namespaces.
NamespaceAll = "*"
+ // IngressAPISIXLeader is the default election id for the controller
+ // leader election.
+ IngressAPISIXLeader = "ingress-apisix-leader"
_minimalResyncInterval = 30 * time.Second
)
@@ -50,6 +53,7 @@ type KubernetesConfig struct {
Kubeconfig string `json:"kubeconfig" yaml:"kubeconfig"`
ResyncInterval types.TimeDuration `json:"resync_interval"
yaml:"resync_interval"`
AppNamespaces []string `json:"app_namespaces"
yaml:"app_namespaces"`
+ ElectionID string `json:"election_id"
yaml:"election_id"`
}
// APISIXConfig contains all APISIX related config items.
@@ -71,6 +75,7 @@ func NewDefaultConfig() *Config {
Kubeconfig: "", // Use in-cluster configurations.
ResyncInterval: types.TimeDuration{Duration: 6 *
time.Hour},
AppNamespaces: []string{v1.NamespaceAll},
+ ElectionID: IngressAPISIXLeader,
},
}
}
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index d220b38..808ed88 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -36,6 +36,7 @@ func TestNewConfigFromFile(t *testing.T) {
ResyncInterval: types.TimeDuration{time.Hour},
Kubeconfig: "/path/to/foo/baz",
AppNamespaces: []string{""},
+ ElectionID: "my-election-id",
},
APISIX: APISIXConfig{
BaseURL: "http://127.0.0.1:8080/apisix",
@@ -72,6 +73,7 @@ enable_profiling: true
kubernetes:
kubeconfig: /path/to/foo/baz
resync_interval: 1h0m0s
+ election_id: my-election-id
apisix:
base_url: http://127.0.0.1:8080/apisix
admin_key: "123456"
diff --git a/pkg/ingress/controller/controller.go
b/pkg/ingress/controller/controller.go
index 2114ec2..14fef47 100644
--- a/pkg/ingress/controller/controller.go
+++ b/pkg/ingress/controller/controller.go
@@ -15,22 +15,26 @@
package controller
import (
+ "context"
"os"
"sync"
-
- v1 "k8s.io/api/core/v1"
-
- "k8s.io/client-go/tools/cache"
-
- "github.com/api7/ingress-controller/pkg/apisix"
+ "time"
clientSet
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
crdclientset
"github.com/gxthrj/apisix-ingress-types/pkg/client/clientset/versioned"
"github.com/gxthrj/apisix-ingress-types/pkg/client/informers/externalversions"
+ "go.uber.org/zap"
+ v1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/tools/leaderelection"
+ "k8s.io/client-go/tools/leaderelection/resourcelock"
"github.com/api7/ingress-controller/pkg/api"
+ "github.com/api7/ingress-controller/pkg/apisix"
"github.com/api7/ingress-controller/pkg/config"
"github.com/api7/ingress-controller/pkg/kube"
"github.com/api7/ingress-controller/pkg/log"
@@ -47,6 +51,9 @@ func recoverException() {
// Controller is the ingress apisix controller object.
type Controller struct {
+ name string
+ namespace string
+ cfg *config.Config
wg sync.WaitGroup
watchingNamespace map[string]struct{}
apiServer *api.Server
@@ -96,6 +103,9 @@ func NewController(cfg *config.Config) (*Controller, error) {
}
c := &Controller{
+ name: podName,
+ namespace: podNamespace,
+ cfg: cfg,
apiServer: apiSrv,
metricsCollector: metrics.NewPrometheusCollector(podName,
podNamespace),
clientset: kube.GetKubeClient(),
@@ -115,30 +125,102 @@ func (c *Controller) goAttach(handler func()) {
}()
}
+// Eventf implements the resourcelock.EventRecorder interface.
+func (c *Controller) Eventf(_ runtime.Object, eventType string, reason string,
message string, _ ...interface{}) {
+ log.Infow(reason, zap.String("message", message),
zap.String("event_type", eventType))
+}
+
// Run launches the controller.
func (c *Controller) Run(stop chan struct{}) error {
- // TODO leader election.
+ rootCtx, rootCancel := context.WithCancel(context.Background())
+ defer rootCancel()
+ go func() {
+ <-stop
+ rootCancel()
+ }()
+ c.metricsCollector.ResetLeader(false)
+
+ go func() {
+ if err := c.apiServer.Run(rootCtx.Done()); err != nil {
+ log.Errorf("failed to launch API Server: %s", err)
+ }
+ }()
+
+ lock := &resourcelock.LeaseLock{
+ LeaseMeta: metav1.ObjectMeta{
+ Namespace: c.namespace,
+ Name: c.cfg.Kubernetes.ElectionID,
+ },
+ Client: c.clientset.CoordinationV1(),
+ LockConfig: resourcelock.ResourceLockConfig{
+ Identity: c.name,
+ EventRecorder: c,
+ },
+ }
+ cfg := leaderelection.LeaderElectionConfig{
+ Lock: lock,
+ LeaseDuration: 15 * time.Second,
+ RenewDeadline: 5 * time.Second,
+ RetryPeriod: 2 * time.Second,
+ Callbacks: leaderelection.LeaderCallbacks{
+ OnStartedLeading: c.run,
+ OnNewLeader: func(identity string) {
+ log.Warnf("found a new leader %s", identity)
+ if identity != c.name {
+ log.Infow("controller now is running as
a candidate",
+ zap.String("namespace",
c.namespace),
+ zap.String("pod", c.name),
+ )
+ }
+ },
+ OnStoppedLeading: func() {
+ log.Infow("controller now is running as a
candidate",
+ zap.String("namespace", c.namespace),
+ zap.String("pod", c.name),
+ )
+ c.metricsCollector.ResetLeader(false)
+ },
+ },
+ ReleaseOnCancel: true,
+ Name: "ingress-apisix",
+ }
+
+ elector, err := leaderelection.NewLeaderElector(cfg)
+ if err != nil {
+ log.Errorf("failed to create leader elector: %s", err.Error())
+ return err
+ }
+
+election:
+ elector.Run(rootCtx)
+ select {
+ case <-rootCtx.Done():
+ return nil
+ default:
+ goto election
+ }
+}
+
+func (c *Controller) run(ctx context.Context) {
+ log.Infow("controller now is running as leader",
+ zap.String("namespace", c.namespace),
+ zap.String("pod", c.name),
+ )
c.metricsCollector.ResetLeader(true)
- log.Info("controller run as leader")
ac := &Api6Controller{
KubeClientSet: c.clientset,
Api6ClientSet: c.crdClientset,
SharedInformerFactory: c.crdInformerFactory,
CoreSharedInformerFactory: kube.CoreSharedInformerFactory,
- Stop: stop,
+ Stop: ctx.Done(),
}
epInformer := ac.CoreSharedInformerFactory.Core().V1().Endpoints()
kube.EndpointsInformer = epInformer
// endpoint
ac.Endpoint(c)
c.goAttach(func() {
- ac.CoreSharedInformerFactory.Start(stop)
- })
- c.goAttach(func() {
- if err := c.apiServer.Run(stop); err != nil {
- log.Errorf("failed to launch API Server: %s", err)
- }
+ ac.CoreSharedInformerFactory.Start(ctx.Done())
})
// ApisixRoute
@@ -151,12 +233,11 @@ func (c *Controller) Run(stop chan struct{}) error {
ac.ApisixTLS(c)
c.goAttach(func() {
- ac.SharedInformerFactory.Start(stop)
+ ac.SharedInformerFactory.Start(ctx.Done())
})
- <-stop
+ <-ctx.Done()
c.wg.Wait()
- return nil
}
// namespaceWatching accepts a resource key, getting the namespace part
@@ -182,7 +263,7 @@ type Api6Controller struct {
Api6ClientSet clientSet.Interface
SharedInformerFactory externalversions.SharedInformerFactory
CoreSharedInformerFactory informers.SharedInformerFactory
- Stop chan struct{}
+ Stop <-chan struct{}
}
func (api6 *Api6Controller) ApisixRoute(controller *Controller) {
diff --git a/samples/deploy/deployment/ingress-controller.yaml
b/samples/deploy/deployment/ingress-controller.yaml
index 98267f9..eae43e1 100644
--- a/samples/deploy/deployment/ingress-controller.yaml
+++ b/samples/deploy/deployment/ingress-controller.yaml
@@ -50,6 +50,15 @@ spec:
- mountPath: /ingress-apisix/conf/config.yaml
name: apisix-ingress-configmap
subPath: config.yaml
+ env:
+ - name: POD_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
volumes:
- configMap:
name: apisix-ingress-cm
diff --git a/test/e2e/ingress/namespace.go b/test/e2e/ingress/namespace.go
index 23601ff..8cc654b 100644
--- a/test/e2e/ingress/namespace.go
+++ b/test/e2e/ingress/namespace.go
@@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"net/http"
+ "time"
"github.com/api7/ingress-controller/test/e2e/scaffold"
"github.com/onsi/ginkgo"
@@ -49,6 +50,10 @@ spec:
assert.Nil(ginkgo.GinkgoT(), s.EnsureNumApisixRoutesCreated(1),
"checking number of routes")
assert.Nil(ginkgo.GinkgoT(),
s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams")
+ // TODO When ingress controller can feedback the lifecycle of
CRDs to the
+ // status field, we can poll it rather than sleeping.
+ time.Sleep(3 * time.Second)
+
body := s.NewAPISIXClient().GET("/ip").WithHeader("Host",
"httpbin.com").Expect().Status(http.StatusOK).Body().Raw()
var placeholder ip
err := json.Unmarshal([]byte(body), &placeholder)
diff --git a/test/e2e/ingress/resourcepushing.go
b/test/e2e/ingress/resourcepushing.go
index 2ffba3f..70490ff 100644
--- a/test/e2e/ingress/resourcepushing.go
+++ b/test/e2e/ingress/resourcepushing.go
@@ -49,10 +49,11 @@ spec:
assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
err = s.EnsureNumApisixUpstreamsCreated(1)
assert.Nil(ginkgo.GinkgoT(), err, "Checking number of
upstreams")
- scale := 2
- err = s.ScaleHTTPBIN(scale)
- assert.Nil(ginkgo.GinkgoT(), err)
- time.Sleep(5 * time.Second) // wait for ingress to sync
+ assert.Nil(ginkgo.GinkgoT(), s.ScaleHTTPBIN(2), "scaling number
of httpbin instancess")
+ assert.Nil(ginkgo.GinkgoT(), s.WaitAllHTTPBINPoddsAvailable(),
"waiting for all httpbin pods ready")
+ // TODO When ingress controller can feedback the lifecycle of
CRDs to the
+ // status field, we can poll it rather than sleeping.
+ time.Sleep(5 * time.Second)
ups, err := s.ListApisixUpstreams()
assert.Nil(ginkgo.GinkgoT(), err, "list upstreams error")
assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 2, "upstreams nodes
not expect")
@@ -84,7 +85,10 @@ spec:
// remove
assert.Nil(ginkgo.GinkgoT(),
s.RemoveResourceByString(apisixRoute))
- time.Sleep(10 * time.Second) // wait for ingress to sync
+
+ // TODO When ingress controller can feedback the lifecycle of
CRDs to the
+ // status field, we can poll it rather than sleeping.
+ time.Sleep(10 * time.Second)
ups, err := s.ListApisixUpstreams()
assert.Nil(ginkgo.GinkgoT(), err, "list upstreams error")
assert.Len(ginkgo.GinkgoT(), ups, 0, "upstreams nodes not
expect")
diff --git a/test/e2e/ingress/sanity.go b/test/e2e/ingress/sanity.go
index 5b2c5cd..fc43fc6 100644
--- a/test/e2e/ingress/sanity.go
+++ b/test/e2e/ingress/sanity.go
@@ -17,6 +17,7 @@ package ingress
import (
"encoding/json"
"net/http"
+ "time"
"github.com/api7/ingress-controller/test/e2e/scaffold"
"github.com/onsi/ginkgo"
@@ -51,6 +52,11 @@ var _ = ginkgo.Describe("single-route", func() {
assert.Nil(ginkgo.GinkgoT(), err, "checking number of routes")
err = s.EnsureNumApisixUpstreamsCreated(1)
assert.Nil(ginkgo.GinkgoT(), err, "checking number of
upstreams")
+
+ // TODO When ingress controller can feedback the lifecycle of
CRDs to the
+ // status field, we can poll it rather than sleeping.
+ time.Sleep(3 * time.Second)
+
body := s.NewAPISIXClient().GET("/ip").WithHeader("Host",
"httpbin.com").Expect().Status(http.StatusOK).Body().Raw()
var placeholder ip
err = json.Unmarshal([]byte(body), &placeholder)
@@ -91,6 +97,9 @@ var _ = ginkgo.Describe("double-routes", func() {
assert.Nil(ginkgo.GinkgoT(), err, "checking number of routes")
err = s.EnsureNumApisixUpstreamsCreated(1)
assert.Nil(ginkgo.GinkgoT(), err, "checking number of
upstreams")
+ // TODO When ingress controller can feedback the lifecycle of
CRDs to the
+ // status field, we can poll it rather than sleeping.
+ time.Sleep(3 * time.Second)
body := s.NewAPISIXClient().GET("/ip").WithHeader("Host",
"httpbin.com").Expect().Status(http.StatusOK).Body().Raw()
var placeholder ip
err = json.Unmarshal([]byte(body), &placeholder)
@@ -103,3 +112,58 @@ var _ = ginkgo.Describe("double-routes", func() {
// We don't care the json data, only make sure it's a normal
json string.
})
})
+
+var _ = ginkgo.Describe("leader election", func() {
+ s := scaffold.NewScaffold(&scaffold.Options{
+ Name: "leaderelection",
+ Kubeconfig: scaffold.GetKubeconfig(),
+ APISIXConfigPath: "testdata/apisix-gw-config.yaml",
+ APISIXDefaultConfigPath:
"testdata/apisix-gw-config-default.yaml",
+ IngressAPISIXReplicas: 2,
+ })
+ ginkgo.It("lease check", func() {
+ pods, err := s.GetIngressPodDetails()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), pods, 2)
+ lease, err := s.WaitGetLeaderLease()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Equal(ginkgo.GinkgoT(),
*lease.Spec.LeaseDurationSeconds, int32(15))
+ if *lease.Spec.HolderIdentity != pods[0].Name &&
*lease.Spec.HolderIdentity != pods[1].Name {
+ assert.Fail(ginkgo.GinkgoT(), "bad leader lease holder
identity")
+ }
+ })
+
+ ginkgo.It("leader failover", func() {
+ pods, err := s.GetIngressPodDetails()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), pods, 2)
+
+ lease, err := s.WaitGetLeaderLease()
+ assert.Nil(ginkgo.GinkgoT(), err)
+
+ leaderIdx := 0
+ if *lease.Spec.HolderIdentity == pods[1].Name {
+ leaderIdx = 1
+ }
+ ginkgo.GinkgoT().Logf("lease is %s", *lease.Spec.HolderIdentity)
+ assert.Nil(ginkgo.GinkgoT(), s.KillPod(pods[leaderIdx].Name))
+
+ // Wait the old lease expire and new leader was elected.
+ time.Sleep(25 * time.Second)
+
+ newLease, err := s.WaitGetLeaderLease()
+ assert.Nil(ginkgo.GinkgoT(), err)
+
+ newPods, err := s.GetIngressPodDetails()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), pods, 2)
+
+ assert.NotEqual(ginkgo.GinkgoT(),
*newLease.Spec.HolderIdentity, *lease.Spec.HolderIdentity)
+ assert.Greater(ginkgo.GinkgoT(),
*newLease.Spec.LeaseTransitions, *lease.Spec.LeaseTransitions)
+
+ if *newLease.Spec.HolderIdentity != newPods[0].Name &&
*newLease.Spec.HolderIdentity != newPods[1].Name {
+ assert.Failf(ginkgo.GinkgoT(), "bad leader lease holder
identity: %s, should be %s or %s",
+ *newLease.Spec.HolderIdentity, newPods[0].Name,
newPods[1].Name)
+ }
+ })
+})
diff --git a/test/e2e/scaffold/httpbin.go b/test/e2e/scaffold/httpbin.go
index 33401b3..f9e1f87 100644
--- a/test/e2e/scaffold/httpbin.go
+++ b/test/e2e/scaffold/httpbin.go
@@ -18,6 +18,9 @@ import (
"fmt"
"time"
+ "github.com/onsi/ginkgo"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
"github.com/gruntwork-io/terratest/modules/k8s"
corev1 "k8s.io/api/core/v1"
)
@@ -112,3 +115,32 @@ func (s *Scaffold) ScaleHTTPBIN(desired int) error {
}
return nil
}
+
+// WaitAllHTTPBINPods waits until all httpbin pods ready.
+func (s *Scaffold) WaitAllHTTPBINPoddsAvailable() error {
+ opts := metav1.ListOptions{
+ LabelSelector: "app=httpbin-deployment-e2e-test",
+ }
+ condFunc := func() (bool, error) {
+ items, err := k8s.ListPodsE(s.t, s.kubectlOptions, opts)
+ if err != nil {
+ return false, err
+ }
+ if len(items) == 0 {
+ ginkgo.GinkgoT().Log("no apisix pods created")
+ return false, nil
+ }
+ for _, item := range items {
+ for _, cond := range item.Status.Conditions {
+ if cond.Type != corev1.PodReady {
+ continue
+ }
+ if cond.Status != "True" {
+ return false, nil
+ }
+ }
+ }
+ return true, nil
+ }
+ return waitExponentialBackoff(condFunc)
+}
diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go
index 9a6de56..6076eb9 100644
--- a/test/e2e/scaffold/ingress.go
+++ b/test/e2e/scaffold/ingress.go
@@ -15,11 +15,15 @@
package scaffold
import (
+ "context"
"fmt"
"github.com/gruntwork-io/terratest/modules/k8s"
"github.com/onsi/ginkgo"
+ coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
+ v1 "k8s.io/api/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@@ -45,7 +49,7 @@ kind: Deployment
metadata:
name: ingress-apisix-controller-deployment-e2e-test
spec:
- replicas: 1
+ replicas: %d
selector:
matchLabels:
app: ingress-apisix-controller-deployment-e2e-test
@@ -77,6 +81,15 @@ spec:
tcpSocket:
port: 8080
timeoutSeconds: 2
+ env:
+ - name: POD_NAMESPACE
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.namespace
+ - name: POD_NAME
+ valueFrom:
+ fieldRef:
+ fieldPath: metadata.name
image: "apache/apisix-ingress-controller:dev"
imagePullPolicy: Never
name: ingress-apisix-controller-deployment-e2e-test
@@ -102,7 +115,7 @@ spec:
)
func (s *Scaffold) newIngressAPISIXController() error {
- ingressAPISIXDeployment :=
fmt.Sprintf(_ingressAPISIXDeploymentTemplate, s.namespace)
+ ingressAPISIXDeployment :=
fmt.Sprintf(_ingressAPISIXDeploymentTemplate, s.opts.IngressAPISIXReplicas,
s.namespace)
if err := k8s.CreateServiceAccountE(s.t, s.kubectlOptions,
_serviceAccount); err != nil {
return err
}
@@ -144,3 +157,35 @@ func (s *Scaffold) waitAllIngressControllerPodsAvailable()
error {
}
return waitExponentialBackoff(condFunc)
}
+
+// WaitGetLeaderLease waits the lease to be created and returns it.
+func (s *Scaffold) WaitGetLeaderLease() (*coordinationv1.Lease, error) {
+ cli, err := k8s.GetKubernetesClientE(s.t)
+ if err != nil {
+ return nil, err
+ }
+ var lease *coordinationv1.Lease
+ condFunc := func() (bool, error) {
+ l, err :=
cli.CoordinationV1().Leases(s.namespace).Get(context.TODO(),
"ingress-apisix-leader", metav1.GetOptions{})
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ return false, nil
+ }
+ return false, err
+ }
+ lease = l
+ return true, nil
+ }
+ if err := waitExponentialBackoff(condFunc); err != nil {
+ return nil, err
+ }
+ return lease, nil
+}
+
+// GetIngressPodDetails returns a batch of pod description
+// about apisix-ingress-controller.
+func (s *Scaffold) GetIngressPodDetails() ([]v1.Pod, error) {
+ return k8s.ListPodsE(s.t, s.kubectlOptions, metav1.ListOptions{
+ LabelSelector:
"app=ingress-apisix-controller-deployment-e2e-test",
+ })
+}
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index fdd1998..6e3e851 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -15,6 +15,7 @@
package scaffold
import (
+ "context"
"fmt"
"io/ioutil"
"net/http"
@@ -26,6 +27,8 @@ import (
"text/template"
"time"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+
"github.com/gavv/httpexpect/v2"
"github.com/gruntwork-io/terratest/modules/k8s"
"github.com/gruntwork-io/terratest/modules/testing"
@@ -41,6 +44,7 @@ type Options struct {
Kubeconfig string
APISIXConfigPath string
APISIXDefaultConfigPath string
+ IngressAPISIXReplicas int
}
type Scaffold struct {
@@ -101,10 +105,20 @@ func NewDefaultScaffold() *Scaffold {
Kubeconfig: GetKubeconfig(),
APISIXConfigPath: "testdata/apisix-gw-config.yaml",
APISIXDefaultConfigPath:
"testdata/apisix-gw-config-default.yaml",
+ IngressAPISIXReplicas: 1,
}
return NewScaffold(opts)
}
+// KillPod kill the pod which name is podName.
+func (s *Scaffold) KillPod(podName string) error {
+ cli, err := k8s.GetKubernetesClientE(s.t)
+ if err != nil {
+ return err
+ }
+ return cli.CoreV1().Pods(s.namespace).Delete(context.TODO(), podName,
metav1.DeleteOptions{})
+}
+
// DefaultHTTPBackend returns the service name and service ports
// of the default http backend.
func (s *Scaffold) DefaultHTTPBackend() (string, []int32) {
@@ -149,14 +163,12 @@ func (s *Scaffold) beforeEach() {
s.etcdService, err = s.newEtcd()
assert.Nil(s.t, err, "initializing etcd")
- // We don't use k8s.WaitUntilServiceAvailable since it hacks for
Minikube.
- err = k8s.WaitUntilNumPodsCreatedE(s.t, s.kubectlOptions,
s.labelSelector("app=etcd-deployment-e2e-test"), 1, 5, 2*time.Second)
+ err = s.waitAllEtcdPodsAvailable()
assert.Nil(s.t, err, "waiting for etcd ready")
s.apisixService, err = s.newAPISIX()
assert.Nil(s.t, err, "initializing Apache APISIX")
- // We don't use k8s.WaitUntilServiceAvailable since it hacks for
Minikube.
err = s.waitAllAPISIXPodsAvailable()
assert.Nil(s.t, err, "waiting for apisix ready")
@@ -180,6 +192,10 @@ func (s *Scaffold) afterEach() {
for _, f := range s.finializers {
f()
}
+
+ // Wait for a while to prevent the worker node being overwhelming
+ // (new cases will be run).
+ time.Sleep(3 * time.Second)
}
func (s *Scaffold) addFinializer(f func()) {
@@ -202,10 +218,9 @@ func (s *Scaffold) renderConfig(path string) (string,
error) {
func waitExponentialBackoff(condFunc func() (bool, error)) error {
backoff := wait.Backoff{
- Duration: 100 * time.Millisecond,
- Factor: 3,
- Jitter: 0,
- Steps: 6,
+ Duration: 500 * time.Millisecond,
+ Factor: 2,
+ Steps: 8,
}
return wait.ExponentialBackoff(backoff, condFunc)
}