This is an automated email from the ASF dual-hosted git repository.

tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 582c4b3  chore: add health check to apisix-admin and make the leader 
election recyclable (#499)
582c4b3 is described below

commit 582c4b362f26ffa8372bf520c3f774170a56c290
Author: Alex Zhang <[email protected]>
AuthorDate: Wed May 26 16:39:58 2021 +0800

    chore: add health check to apisix-admin and make the leader election 
recyclable (#499)
    
    Co-authored-by: Liu Peng <[email protected]>
---
 pkg/apisix/apisix.go                          |   6 +-
 pkg/apisix/cluster.go                         | 122 ++++++++++++++++-----
 pkg/apisix/nonexistentclient.go               |   4 +
 pkg/ingress/apisix_cluster_config.go          |   3 +-
 pkg/ingress/apisix_route.go                   |   3 +-
 pkg/ingress/apisix_tls.go                     |   3 +-
 pkg/ingress/apisix_upstream.go                |   3 +-
 pkg/ingress/controller.go                     | 151 +++++++++++++++++---------
 pkg/ingress/endpoint.go                       |   2 +-
 pkg/ingress/ingress.go                        |   2 +-
 pkg/ingress/secret.go                         |   2 +-
 pkg/kube/apisix/apis/config/v2alpha1/types.go |   4 +
 pkg/kube/init.go                              |  29 +++--
 13 files changed, 235 insertions(+), 99 deletions(-)

diff --git a/pkg/apisix/apisix.go b/pkg/apisix/apisix.go
index ffcb405..4200e0a 100644
--- a/pkg/apisix/apisix.go
+++ b/pkg/apisix/apisix.go
@@ -50,6 +50,8 @@ type Cluster interface {
        String() string
        // HasSynced checks whether all resources in APISIX cluster is synced 
to cache.
        HasSynced(context.Context) error
+       // HealthCheck checks apisix cluster health in realtime.
+       HealthCheck(context.Context) error
 }
 
 // Route is the specific client interface to take over the create, update,
@@ -122,6 +124,7 @@ type apisix struct {
 func NewClient() (APISIX, error) {
        cli := &apisix{
                nonExistentCluster: newNonExistentCluster(),
+               clusters:           make(map[string]Cluster),
        }
        return cli, nil
 }
@@ -160,9 +163,6 @@ func (c *apisix) AddCluster(co *ClusterOptions) error {
        if err != nil {
                return err
        }
-       if c.clusters == nil {
-               c.clusters = make(map[string]Cluster)
-       }
        c.clusters[co.Name] = cluster
        return nil
 }
diff --git a/pkg/apisix/cluster.go b/pkg/apisix/cluster.go
index d41e5dd..4d42d1e 100644
--- a/pkg/apisix/cluster.go
+++ b/pkg/apisix/cluster.go
@@ -21,7 +21,9 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       "net"
        "net/http"
+       "net/url"
        "strings"
        "sync/atomic"
        "time"
@@ -37,8 +39,7 @@ import (
 const (
        _defaultTimeout = 5 * time.Second
 
-       _cacheNotSync = iota
-       _cacheSyncing
+       _cacheSyncing = iota
        _cacheSynced
 )
 
@@ -50,6 +51,19 @@ var (
        ErrDuplicatedCluster = errors.New("duplicated cluster")
 
        _errReadOnClosedResBody = errors.New("http: read on closed response 
body")
+
+       // Default shared transport for apisix client
+       _defaultTransport = &http.Transport{
+               Proxy: http.ProxyFromEnvironment,
+               Dial: (&net.Dialer{
+                       Timeout: 3 * time.Second,
+               }).Dial,
+               DialContext: (&net.Dialer{
+                       Timeout: 3 * time.Second,
+               }).DialContext,
+               ResponseHeaderTimeout: 30 * time.Second,
+               ExpectContinueTimeout: 1 * time.Second,
+       }
 )
 
 // ClusterOptions contains parameters to customize APISIX client.
@@ -63,6 +77,7 @@ type ClusterOptions struct {
 type cluster struct {
        name         string
        baseURL      string
+       baseURLHost  string
        adminKey     string
        cli          *http.Client
        cacheState   int32
@@ -86,19 +101,21 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
        }
        o.BaseURL = strings.TrimSuffix(o.BaseURL, "/")
 
+       u, err := url.Parse(o.BaseURL)
+       if err != nil {
+               return nil, err
+       }
+
        c := &cluster{
-               name:     o.Name,
-               baseURL:  o.BaseURL,
-               adminKey: o.AdminKey,
+               name:        o.Name,
+               baseURL:     o.BaseURL,
+               baseURLHost: u.Host,
+               adminKey:    o.AdminKey,
                cli: &http.Client{
-                       Timeout: o.Timeout,
-                       Transport: &http.Transport{
-                               ResponseHeaderTimeout: o.Timeout,
-                               ExpectContinueTimeout: o.Timeout,
-                       },
+                       Timeout:   o.Timeout,
+                       Transport: _defaultTransport,
                },
-               cache:       nil,
-               cacheState:  _cacheNotSync,
+               cacheState:  _cacheSyncing, // default state
                cacheSynced: make(chan struct{}),
        }
        c.route = newRouteClient(c)
@@ -108,6 +125,11 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
        c.globalRules = newGlobalRuleClient(c)
        c.consumer = newConsumerClient(c)
 
+       c.cache, err = cache.NewMemDBCache()
+       if err != nil {
+               return nil, err
+       }
+
        go c.syncCache()
 
        return c, nil
@@ -116,9 +138,6 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
 func (c *cluster) syncCache() {
        log.Infow("syncing cache", zap.String("cluster", c.name))
        now := time.Now()
-       if !atomic.CompareAndSwapInt32(&c.cacheState, _cacheNotSync, 
_cacheSyncing) {
-               panic("dubious state when sync cache")
-       }
        defer func() {
                if c.cacheSyncErr == nil {
                        log.Infow("cache synced",
@@ -134,13 +153,20 @@ func (c *cluster) syncCache() {
        }()
 
        backoff := wait.Backoff{
-               Duration: time.Second,
-               Factor:   2,
-               Steps:    6,
-       }
-       err := wait.ExponentialBackoff(backoff, c.syncCacheOnce)
+               Duration: 2 * time.Second,
+               Factor:   1,
+               Steps:    5,
+       }
+       var lastSyncErr error
+       err := wait.ExponentialBackoff(backoff, func() (done bool, _ error) {
+               // impossibly return: false, nil
+               // so can safe used
+               done, lastSyncErr = c.syncCacheOnce()
+               return
+       })
        if err != nil {
-               c.cacheSyncErr = err
+               // if ErrWaitTimeout then set lastSyncErr
+               c.cacheSyncErr = lastSyncErr
        }
        close(c.cacheSynced)
 
@@ -150,12 +176,6 @@ func (c *cluster) syncCache() {
 }
 
 func (c *cluster) syncCacheOnce() (bool, error) {
-       dbcache, err := cache.NewMemDBCache()
-       if err != nil {
-               return false, err
-       }
-       c.cache = dbcache
-
        routes, err := c.route.List(context.TODO())
        if err != nil {
                log.Errorf("failed to list route in APISIX: %s", err)
@@ -306,6 +326,54 @@ func (c *cluster) GlobalRule() GlobalRule {
        return c.globalRules
 }
 
+// HealthCheck implements Cluster.HealthCheck method.
+func (c *cluster) HealthCheck(ctx context.Context) (err error) {
+       if c.cacheSyncErr != nil {
+               err = c.cacheSyncErr
+               return
+       }
+       if atomic.LoadInt32(&c.cacheState) == _cacheSyncing {
+               return
+       }
+
+       // Retry three times in a row, and exit if all of them fail.
+       backoff := wait.Backoff{
+               Duration: 5 * time.Second,
+               Factor:   1,
+               Steps:    3,
+       }
+       var lastCheckErr error
+       err = wait.ExponentialBackoffWithContext(ctx, backoff, func() (done 
bool, _ error) {
+               if lastCheckErr = c.healthCheck(ctx); lastCheckErr != nil {
+                       log.Warnf("failed to check health for cluster %s: %s, 
will retry", c.name, lastCheckErr)
+                       return
+               }
+               done = true
+               return
+       })
+       if err != nil {
+               // if ErrWaitTimeout then set lastSyncErr
+               c.cacheSyncErr = lastCheckErr
+       }
+       return err
+}
+
+func (c *cluster) healthCheck(ctx context.Context) (err error) {
+       // tcp socket probe
+       d := net.Dialer{Timeout: 3 * time.Second}
+       conn, err := d.DialContext(ctx, "tcp", c.baseURLHost)
+       if err != nil {
+               return err
+       }
+       if er := conn.Close(); er != nil {
+               log.Warnw("failed to close tcp probe connection",
+                       zap.Error(err),
+                       zap.String("cluster", c.name),
+               )
+       }
+       return
+}
+
 func (c *cluster) applyAuth(req *http.Request) {
        if c.adminKey != "" {
                req.Header.Set("X-API-Key", c.adminKey)
diff --git a/pkg/apisix/nonexistentclient.go b/pkg/apisix/nonexistentclient.go
index 63c12a4..a3c38ee 100644
--- a/pkg/apisix/nonexistentclient.go
+++ b/pkg/apisix/nonexistentclient.go
@@ -180,6 +180,10 @@ func (nc *nonExistentCluster) HasSynced(_ context.Context) 
error {
        return nil
 }
 
+func (nc *nonExistentCluster) HealthCheck(_ context.Context) error {
+       return nil
+}
+
 func (nc *nonExistentCluster) String() string {
        return "non-existent cluster"
 }
diff --git a/pkg/ingress/apisix_cluster_config.go 
b/pkg/ingress/apisix_cluster_config.go
index c275a3c..5df2e5d 100644
--- a/pkg/ingress/apisix_cluster_config.go
+++ b/pkg/ingress/apisix_cluster_config.go
@@ -54,6 +54,8 @@ func (c *Controller) newApisixClusterConfigController() 
*apisixClusterConfigCont
 func (c *apisixClusterConfigController) run(ctx context.Context) {
        log.Info("ApisixClusterConfig controller started")
        defer log.Info("ApisixClusterConfig controller exited")
+       defer c.workqueue.ShutDown()
+
        if ok := cache.WaitForCacheSync(ctx.Done(), 
c.controller.apisixClusterConfigInformer.HasSynced); !ok {
                log.Error("cache sync failed")
                return
@@ -62,7 +64,6 @@ func (c *apisixClusterConfigController) run(ctx 
context.Context) {
                go c.runWorker(ctx)
        }
        <-ctx.Done()
-       c.workqueue.ShutDown()
 }
 
 func (c *apisixClusterConfigController) runWorker(ctx context.Context) {
diff --git a/pkg/ingress/apisix_route.go b/pkg/ingress/apisix_route.go
index a374626..1172c73 100644
--- a/pkg/ingress/apisix_route.go
+++ b/pkg/ingress/apisix_route.go
@@ -56,6 +56,8 @@ func (c *Controller) newApisixRouteController() 
*apisixRouteController {
 func (c *apisixRouteController) run(ctx context.Context) {
        log.Info("ApisixRoute controller started")
        defer log.Info("ApisixRoute controller exited")
+       defer c.workqueue.ShutDown()
+
        ok := cache.WaitForCacheSync(ctx.Done(), 
c.controller.apisixRouteInformer.HasSynced)
        if !ok {
                log.Error("cache sync failed")
@@ -66,7 +68,6 @@ func (c *apisixRouteController) run(ctx context.Context) {
                go c.runWorker(ctx)
        }
        <-ctx.Done()
-       c.workqueue.ShutDown()
 }
 
 func (c *apisixRouteController) runWorker(ctx context.Context) {
diff --git a/pkg/ingress/apisix_tls.go b/pkg/ingress/apisix_tls.go
index 169eb1d..643d37b 100644
--- a/pkg/ingress/apisix_tls.go
+++ b/pkg/ingress/apisix_tls.go
@@ -57,6 +57,8 @@ func (c *Controller) newApisixTlsController() 
*apisixTlsController {
 func (c *apisixTlsController) run(ctx context.Context) {
        log.Info("ApisixTls controller started")
        defer log.Info("ApisixTls controller exited")
+       defer c.workqueue.ShutDown()
+
        if ok := cache.WaitForCacheSync(ctx.Done(), 
c.controller.apisixTlsInformer.HasSynced, 
c.controller.secretInformer.HasSynced); !ok {
                log.Errorf("informers sync failed")
                return
@@ -66,7 +68,6 @@ func (c *apisixTlsController) run(ctx context.Context) {
        }
 
        <-ctx.Done()
-       c.workqueue.ShutDown()
 }
 
 func (c *apisixTlsController) runWorker(ctx context.Context) {
diff --git a/pkg/ingress/apisix_upstream.go b/pkg/ingress/apisix_upstream.go
index a05d9c1..4ba0162 100644
--- a/pkg/ingress/apisix_upstream.go
+++ b/pkg/ingress/apisix_upstream.go
@@ -57,6 +57,8 @@ func (c *Controller) newApisixUpstreamController() 
*apisixUpstreamController {
 func (c *apisixUpstreamController) run(ctx context.Context) {
        log.Info("ApisixUpstream controller started")
        defer log.Info("ApisixUpstream controller exited")
+       defer c.workqueue.ShutDown()
+
        if ok := cache.WaitForCacheSync(ctx.Done(), 
c.controller.apisixUpstreamInformer.HasSynced, 
c.controller.svcInformer.HasSynced); !ok {
                log.Error("cache sync failed")
                return
@@ -66,7 +68,6 @@ func (c *apisixUpstreamController) run(ctx context.Context) {
        }
 
        <-ctx.Done()
-       c.workqueue.ShutDown()
 }
 
 func (c *apisixUpstreamController) runWorker(ctx context.Context) {
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index f573564..9914c02 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -135,9 +135,7 @@ func NewController(cfg *config.Config) (*Controller, error) 
{
        }
 
        var (
-               watchingNamespace   map[string]struct{}
-               ingressInformer     cache.SharedIndexInformer
-               apisixRouteInformer cache.SharedIndexInformer
+               watchingNamespace map[string]struct{}
        )
        if len(cfg.Kubernetes.AppNamespaces) > 1 || 
cfg.Kubernetes.AppNamespaces[0] != v1.NamespaceAll {
                watchingNamespace = make(map[string]struct{}, 
len(cfg.Kubernetes.AppNamespaces))
@@ -146,27 +144,6 @@ func NewController(cfg *config.Config) (*Controller, 
error) {
                }
        }
 
-       ingressLister := kube.NewIngressLister(
-               
kubeClient.SharedIndexInformerFactory.Networking().V1().Ingresses().Lister(),
-               
kubeClient.SharedIndexInformerFactory.Networking().V1beta1().Ingresses().Lister(),
-               
kubeClient.SharedIndexInformerFactory.Extensions().V1beta1().Ingresses().Lister(),
-       )
-       apisixRouteLister := 
kube.NewApisixRouteLister(kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixRoutes().Lister(),
-               
kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixRoutes().Lister())
-
-       if cfg.Kubernetes.IngressVersion == config.IngressNetworkingV1 {
-               ingressInformer = 
kubeClient.SharedIndexInformerFactory.Networking().V1().Ingresses().Informer()
-       } else if cfg.Kubernetes.IngressVersion == 
config.IngressNetworkingV1beta1 {
-               ingressInformer = 
kubeClient.SharedIndexInformerFactory.Networking().V1beta1().Ingresses().Informer()
-       } else {
-               ingressInformer = 
kubeClient.SharedIndexInformerFactory.Extensions().V1beta1().Ingresses().Informer()
-       }
-       if cfg.Kubernetes.ApisixRouteVersion == config.ApisixRouteV2alpha1 {
-               apisixRouteInformer = 
kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixRoutes().Informer()
-       } else {
-               apisixRouteInformer = 
kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixRoutes().Informer()
-       }
-
        // recorder
        utilruntime.Must(apisixscheme.AddToScheme(scheme.Scheme))
        eventBroadcaster := record.NewBroadcaster()
@@ -183,24 +160,35 @@ func NewController(cfg *config.Config) (*Controller, 
error) {
                watchingNamespace: watchingNamespace,
                secretSSLMap:      new(sync.Map),
                recorder:          eventBroadcaster.NewRecorder(scheme.Scheme, 
v1.EventSource{Component: _component}),
-
-               epInformer:                  
kubeClient.SharedIndexInformerFactory.Core().V1().Endpoints().Informer(),
-               epLister:                    
kubeClient.SharedIndexInformerFactory.Core().V1().Endpoints().Lister(),
-               svcInformer:                 
kubeClient.SharedIndexInformerFactory.Core().V1().Services().Informer(),
-               svcLister:                   
kubeClient.SharedIndexInformerFactory.Core().V1().Services().Lister(),
-               ingressLister:               ingressLister,
-               ingressInformer:             ingressInformer,
-               secretInformer:              
kubeClient.SharedIndexInformerFactory.Core().V1().Secrets().Informer(),
-               secretLister:                
kubeClient.SharedIndexInformerFactory.Core().V1().Secrets().Lister(),
-               apisixRouteInformer:         apisixRouteInformer,
-               apisixRouteLister:           apisixRouteLister,
-               apisixUpstreamInformer:      
kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixUpstreams().Informer(),
-               apisixUpstreamLister:        
kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixUpstreams().Lister(),
-               apisixTlsInformer:           
kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixTlses().Informer(),
-               apisixTlsLister:             
kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixTlses().Lister(),
-               apisixClusterConfigInformer: 
kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Informer(),
-               apisixClusterConfigLister:   
kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Lister(),
        }
+       return c, nil
+}
+
+func (c *Controller) initWhenStartLeading() {
+       var (
+               ingressInformer     cache.SharedIndexInformer
+               apisixRouteInformer cache.SharedIndexInformer
+       )
+
+       kubeFactory := c.kubeClient.NewSharedIndexInformerFactory()
+       apisixFactory := c.kubeClient.NewAPISIXSharedIndexInformerFactory()
+
+       c.epLister = kubeFactory.Core().V1().Endpoints().Lister()
+       c.svcLister = kubeFactory.Core().V1().Services().Lister()
+       c.ingressLister = kube.NewIngressLister(
+               kubeFactory.Networking().V1().Ingresses().Lister(),
+               kubeFactory.Networking().V1beta1().Ingresses().Lister(),
+               kubeFactory.Extensions().V1beta1().Ingresses().Lister(),
+       )
+       c.secretLister = kubeFactory.Core().V1().Secrets().Lister()
+       c.apisixRouteLister = kube.NewApisixRouteLister(
+               apisixFactory.Apisix().V1().ApisixRoutes().Lister(),
+               apisixFactory.Apisix().V2alpha1().ApisixRoutes().Lister(),
+       )
+       c.apisixUpstreamLister = 
apisixFactory.Apisix().V1().ApisixUpstreams().Lister()
+       c.apisixTlsLister = apisixFactory.Apisix().V1().ApisixTlses().Lister()
+       c.apisixClusterConfigLister = 
apisixFactory.Apisix().V2alpha1().ApisixClusterConfigs().Lister()
+
        c.translator = translation.NewTranslator(&translation.TranslatorOptions{
                EndpointsLister:      c.epLister,
                ServiceLister:        c.svcLister,
@@ -208,15 +196,35 @@ func NewController(cfg *config.Config) (*Controller, 
error) {
                SecretLister:         c.secretLister,
        })
 
+       if c.cfg.Kubernetes.IngressVersion == config.IngressNetworkingV1 {
+               ingressInformer = 
kubeFactory.Networking().V1().Ingresses().Informer()
+       } else if c.cfg.Kubernetes.IngressVersion == 
config.IngressNetworkingV1beta1 {
+               ingressInformer = 
kubeFactory.Networking().V1beta1().Ingresses().Informer()
+       } else {
+               ingressInformer = 
kubeFactory.Extensions().V1beta1().Ingresses().Informer()
+       }
+       if c.cfg.Kubernetes.ApisixRouteVersion == config.ApisixRouteV2alpha1 {
+               apisixRouteInformer = 
apisixFactory.Apisix().V2alpha1().ApisixRoutes().Informer()
+       } else {
+               apisixRouteInformer = 
apisixFactory.Apisix().V1().ApisixRoutes().Informer()
+       }
+
+       c.epInformer = kubeFactory.Core().V1().Endpoints().Informer()
+       c.svcInformer = kubeFactory.Core().V1().Services().Informer()
+       c.ingressInformer = ingressInformer
+       c.apisixRouteInformer = apisixRouteInformer
+       c.apisixUpstreamInformer = 
apisixFactory.Apisix().V1().ApisixUpstreams().Informer()
+       c.apisixClusterConfigInformer = 
apisixFactory.Apisix().V2alpha1().ApisixClusterConfigs().Informer()
+       c.secretInformer = kubeFactory.Core().V1().Secrets().Informer()
+       c.apisixTlsInformer = 
apisixFactory.Apisix().V1().ApisixTlses().Informer()
+
        c.endpointsController = c.newEndpointsController()
        c.apisixUpstreamController = c.newApisixUpstreamController()
+       c.ingressController = c.newIngressController()
        c.apisixRouteController = c.newApisixRouteController()
        c.apisixClusterConfigController = c.newApisixClusterConfigController()
        c.apisixTlsController = c.newApisixTlsController()
-       c.ingressController = c.newIngressController()
        c.secretController = c.newSecretController()
-
-       return c, nil
 }
 
 // recorderEvent recorder events for resources
@@ -320,30 +328,47 @@ election:
 }
 
 func (c *Controller) run(ctx context.Context) {
-       log.Infow("controller now is running as leader",
+       log.Infow("controller tries to leading ...",
                zap.String("namespace", c.namespace),
                zap.String("pod", c.name),
        )
+
+       var cancelFunc context.CancelFunc
+       ctx, cancelFunc = context.WithCancel(ctx)
+       defer cancelFunc()
+
+       // give up leader
        defer c.leaderContextCancelFunc()
-       c.metricsCollector.ResetLeader(true)
 
-       err := c.apisix.AddCluster(&apisix.ClusterOptions{
+       clusterOpts := &apisix.ClusterOptions{
                Name:     c.cfg.APISIX.DefaultClusterName,
                AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
                BaseURL:  c.cfg.APISIX.DefaultClusterBaseURL,
-       })
+       }
+       err := c.apisix.AddCluster(clusterOpts)
        if err != nil && err != apisix.ErrDuplicatedCluster {
-               // TODO give up the leader role.
+               // TODO give up the leader role
                log.Errorf("failed to add default cluster: %s", err)
                return
        }
 
        if err := 
c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {
-               // TODO give up the leader role.
+               // TODO give up the leader role
                log.Errorf("failed to wait the default cluster to be ready: 
%s", err)
+
+               // re-create apisix cluster, used in next c.run
+               if err = c.apisix.UpdateCluster(clusterOpts); err != nil {
+                       log.Errorf("failed to update default cluster: %s", err)
+                       return
+               }
                return
        }
 
+       c.initWhenStartLeading()
+
+       c.goAttach(func() {
+               c.checkClusterHealth(ctx, cancelFunc)
+       })
        c.goAttach(func() {
                c.epInformer.Run(ctx.Done())
        })
@@ -390,6 +415,13 @@ func (c *Controller) run(ctx context.Context) {
                c.secretController.run(ctx)
        })
 
+       c.metricsCollector.ResetLeader(true)
+
+       log.Infow("controller now is running as leader",
+               zap.String("namespace", c.namespace),
+               zap.String("pod", c.name),
+       )
+
        <-ctx.Done()
        c.wg.Wait()
 }
@@ -426,3 +458,22 @@ func (c *Controller) syncSSL(ctx context.Context, ssl 
*apisixv1.Ssl, event types
        }
        return err
 }
+
+func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc 
context.CancelFunc) {
+       defer cancelFunc()
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-time.After(5 * time.Second):
+               }
+
+               err := 
c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HealthCheck(ctx)
+               if err != nil {
+                       // Finally failed health check, then give up leader.
+                       log.Warnf("failed to check health for default cluster: 
%s, give up leader", err)
+                       return
+               }
+               log.Debugf("success check health for default cluster")
+       }
+}
diff --git a/pkg/ingress/endpoint.go b/pkg/ingress/endpoint.go
index fbf64f3..cfcea7f 100644
--- a/pkg/ingress/endpoint.go
+++ b/pkg/ingress/endpoint.go
@@ -58,6 +58,7 @@ func (c *Controller) newEndpointsController() 
*endpointsController {
 func (c *endpointsController) run(ctx context.Context) {
        log.Info("endpoints controller started")
        defer log.Info("endpoints controller exited")
+       defer c.workqueue.ShutDown()
 
        if ok := cache.WaitForCacheSync(ctx.Done(), 
c.controller.epInformer.HasSynced); !ok {
                log.Error("informers sync failed")
@@ -82,7 +83,6 @@ func (c *endpointsController) run(ctx context.Context) {
        }
 
        <-ctx.Done()
-       c.workqueue.ShutDown()
 }
 
 func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error 
{
diff --git a/pkg/ingress/ingress.go b/pkg/ingress/ingress.go
index 73077b7..6167e7d 100644
--- a/pkg/ingress/ingress.go
+++ b/pkg/ingress/ingress.go
@@ -57,6 +57,7 @@ func (c *Controller) newIngressController() 
*ingressController {
 func (c *ingressController) run(ctx context.Context) {
        log.Info("ingress controller started")
        defer log.Infof("ingress controller exited")
+       defer c.workqueue.ShutDown()
 
        if !cache.WaitForCacheSync(ctx.Done(), 
c.controller.ingressInformer.HasSynced) {
                log.Errorf("cache sync failed")
@@ -66,7 +67,6 @@ func (c *ingressController) run(ctx context.Context) {
                go c.runWorker(ctx)
        }
        <-ctx.Done()
-       c.workqueue.ShutDown()
 }
 
 func (c *ingressController) runWorker(ctx context.Context) {
diff --git a/pkg/ingress/secret.go b/pkg/ingress/secret.go
index 143eddc..8b3aec2 100644
--- a/pkg/ingress/secret.go
+++ b/pkg/ingress/secret.go
@@ -59,6 +59,7 @@ func (c *Controller) newSecretController() *secretController {
 func (c *secretController) run(ctx context.Context) {
        log.Info("secret controller started")
        defer log.Info("secret controller exited")
+       defer c.workqueue.ShutDown()
 
        if ok := cache.WaitForCacheSync(ctx.Done(), 
c.controller.secretInformer.HasSynced); !ok {
                log.Error("informers sync failed")
@@ -70,7 +71,6 @@ func (c *secretController) run(ctx context.Context) {
        }
 
        <-ctx.Done()
-       c.workqueue.ShutDown()
 }
 
 func (c *secretController) runWorker(ctx context.Context) {
diff --git a/pkg/kube/apisix/apis/config/v2alpha1/types.go 
b/pkg/kube/apisix/apis/config/v2alpha1/types.go
index 08b4a8b..1fdf482 100644
--- a/pkg/kube/apisix/apis/config/v2alpha1/types.go
+++ b/pkg/kube/apisix/apis/config/v2alpha1/types.go
@@ -20,6 +20,8 @@ import (
        corev1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/util/intstr"
+
+       "github.com/apache/apisix-ingress-controller/pkg/types"
 )
 
 const (
@@ -299,6 +301,8 @@ type ApisixClusterAdminConfig struct {
        BaseURL string `json:"baseURL" yaml:"baseURL"`
        // AdminKey is used to verify the admin API user.
        AdminKey string `json:"adminKey" yaml:"adminKey"`
+       // ClientTimeout is request timeout for the APISIX Admin API client
+       ClientTimeout types.TimeDuration `json:"clientTimeout" 
yaml:"clientTimeout"`
 }
 
 // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
diff --git a/pkg/kube/init.go b/pkg/kube/init.go
index d7b21e9..55af64e 100644
--- a/pkg/kube/init.go
+++ b/pkg/kube/init.go
@@ -25,16 +25,12 @@ import (
 
 // KubeClient contains some objects used to communicate with Kubernetes API 
Server.
 type KubeClient struct {
+       cfg *config.Config
+
        // Client is the object used to operate Kubernetes builtin resources.
        Client kubernetes.Interface
        // APISIXClient is the object used to operate resources under 
apisix.apache.org group.
        APISIXClient clientset.Interface
-       // SharedIndexInformerFactory is the index informer factory object used 
to watch and
-       // list Kubernetes builtin resources.
-       SharedIndexInformerFactory informers.SharedInformerFactory
-       // APISIXSharedIndexInformerFactory is the index informer factory 
object used to watch
-       // and list Kubernetes resources in apisix.apache.org group.
-       APISIXSharedIndexInformerFactory externalversions.SharedInformerFactory
 }
 
 // NewKubeClient creates a high-level Kubernetes client.
@@ -52,13 +48,22 @@ func NewKubeClient(cfg *config.Config) (*KubeClient, error) 
{
        if err != nil {
                return nil, err
        }
-       factory := informers.NewSharedInformerFactory(kubeClient, 
cfg.Kubernetes.ResyncInterval.Duration)
-       apisixFactory := 
externalversions.NewSharedInformerFactory(apisixKubeClient, 
cfg.Kubernetes.ResyncInterval.Duration)
 
        return &KubeClient{
-               Client:                           kubeClient,
-               APISIXClient:                     apisixKubeClient,
-               SharedIndexInformerFactory:       factory,
-               APISIXSharedIndexInformerFactory: apisixFactory,
+               cfg:          cfg,
+               Client:       kubeClient,
+               APISIXClient: apisixKubeClient,
        }, nil
 }
+
+// SharedIndexInformerFactory is the index informer factory object used to 
watch and
+// list Kubernetes builtin resources.
+func (k *KubeClient) NewSharedIndexInformerFactory() 
informers.SharedInformerFactory {
+       return informers.NewSharedInformerFactory(k.Client, 
k.cfg.Kubernetes.ResyncInterval.Duration)
+}
+
+// APISIXSharedIndexInformerFactory is the index informer factory object used 
to watch
+// and list Kubernetes resources in apisix.apache.org group.
+func (k *KubeClient) NewAPISIXSharedIndexInformerFactory() 
externalversions.SharedInformerFactory {
+       return externalversions.NewSharedInformerFactory(k.APISIXClient, 
k.cfg.Kubernetes.ResyncInterval.Duration)
+}

Reply via email to