tokers commented on a change in pull request #453:
URL: 
https://github.com/apache/apisix-ingress-controller/pull/453#discussion_r633025408



##########
File path: pkg/apisix/cluster.go
##########
@@ -290,6 +303,39 @@ func (c *cluster) GlobalRule() GlobalRule {
        return c.globalRules
 }
 
+// HealthCheck implements Cluster.HealthCheck method.
+func (c *cluster) HealthCheck(ctx context.Context, backoff wait.Backoff) (err 
error) {
+       if c.cacheSyncErr != nil {
+               err = c.cacheSyncErr
+               return
+       }
+       if atomic.LoadInt32(&c.cacheState) == _cacheSyncing {
+               return
+       }
+       var lastCheckErr error
+       err = wait.ExponentialBackoffWithContext(ctx, backoff, func() (done 
bool, _ error) {
+               if lastCheckErr = c.healthCheck(ctx); lastCheckErr != nil {
+                       log.Warnf("failed to HealthCheck for cluster %s: %s, 
will retry", c.name, lastCheckErr)
+                       return
+               }
+               done = true
+               return
+       })
+       if err != nil {
+               // if ErrWaitTimeout then set lastSyncErr
+               c.cacheSyncErr = lastCheckErr
+       }
+       return err
+}
+
+func (c *cluster) healthCheck(ctx context.Context) (err error) {
+       // TODO

Review comment:
       Just use TCP socket probe is OK.

##########
File path: pkg/ingress/controller.go
##########
@@ -423,3 +449,26 @@ func (c *Controller) syncSSL(ctx context.Context, ssl 
*apisixv1.Ssl, event types
        }
        return err
 }
+
+func (c *Controller) checkClusterHealthy(ctx context.Context) {
+       for {
+               select {
+               case <-ctx.Done():
+               case <-time.After(5 * time.Second):
+               }
+
+               // Retry three times in a row, and exit if all of them fail.
+               backoff := wait.Backoff{
+                       Duration: 5 * time.Second,
+                       Factor:   1,
+                       Steps:    3,
+               }
+               err := 
c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HealthCheck(ctx, backoff)
+               if err != nil {
+                       // Finally failed health check, then give up leader.
+                       c.leaderContextCancelFunc()
+                       log.Warnf("failed to HealthCheck for default cluster: 
%s, give up leader", err)

Review comment:
       ```suggestion
                        log.Warnf("failed to check health for default cluster: 
%s, give up leader", err)
   ```

##########
File path: pkg/kube/init.go
##########
@@ -25,16 +25,12 @@ import (
 
 // KubeClient contains some objects used to communicate with Kubernetes API 
Server.
 type KubeClient struct {
+       cfg *config.Config
+
        // Client is the object used to operate Kubernetes builtin resources.
        Client kubernetes.Interface
        // APISIXClient is the object used to operate resources under 
apisix.apache.org group.
        APISIXClient clientset.Interface
-       // SharedIndexInformerFactory is the index informer factory object used 
to watch and

Review comment:
       Why change this?

##########
File path: pkg/config/config.go
##########
@@ -85,6 +85,8 @@ type APISIXConfig struct {
        // DefaultClusterAdminKey is the admin key for the default cluster.
        // TODO: Obsolete the plain way to specify admin_key, which is insecure.
        DefaultClusterAdminKey string `json:"default_cluster_admin_key" 
yaml:"default_cluster_admin_key"`
+       // DefaultClusterClientTimeout is the request timeout for default 
cluster client.
+       DefaultClusterClientTimeout types.TimeDuration 
`json:"default_cluster_client_timeout" yaml:"default_cluster_client_timeout"`

Review comment:
       We will introduce `ApisixClusterConfig` to set these options, so don't 
add it here.

##########
File path: pkg/apisix/cluster.go
##########
@@ -50,6 +50,19 @@ var (
        ErrDuplicatedCluster = errors.New("duplicated cluster")
 
        _errReadOnClosedResBody = errors.New("http: read on closed response 
body")
+
+       // Default shared transport if apisix client
+       defaultTransport = &http.Transport{

Review comment:
       ```suggestion
        _defaultTransport = &http.Transport{
   ```

##########
File path: pkg/apisix/apisix.go
##########
@@ -19,6 +19,7 @@ import (
        "sync"
 
        v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+       "k8s.io/apimachinery/pkg/util/wait"

Review comment:
       By convention, we put the 3-party package at the middle of `import`.
   
   ```go
   import (
        std
        ......
   
        "k8s.io/apimachinery/pkg/util/wait"
   
        v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
   )
   ```

##########
File path: pkg/ingress/apisix_cluster_config.go
##########
@@ -62,7 +64,6 @@ func (c *apisixClusterConfigController) run(ctx 
context.Context) {
                go c.runWorker(ctx)
        }
        <-ctx.Done()
-       c.workqueue.ShutDown()

Review comment:
       It's not related to PR change. If you wanna change them to use `defer`, 
just in another PR.

##########
File path: pkg/ingress/controller.go
##########
@@ -317,30 +326,42 @@ election:
 }
 
 func (c *Controller) run(ctx context.Context) {
-       log.Infow("controller now is running as leader",
+       log.Infow("controller is start leading ...",

Review comment:
       ```suggestion
        log.Infow("controller tries to leading ...",
   ```

##########
File path: pkg/ingress/controller.go
##########
@@ -317,30 +326,42 @@ election:
 }
 
 func (c *Controller) run(ctx context.Context) {
-       log.Infow("controller now is running as leader",
+       log.Infow("controller is start leading ...",
                zap.String("namespace", c.namespace),
                zap.String("pod", c.name),
        )
+
        defer c.leaderContextCancelFunc()
        c.metricsCollector.ResetLeader(true)
 
-       err := c.apisix.AddCluster(&apisix.ClusterOptions{
+       clusterOpts := &apisix.ClusterOptions{
                Name:     c.cfg.APISIX.DefaultClusterName,
                AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
                BaseURL:  c.cfg.APISIX.DefaultClusterBaseURL,
-       })
+               Timeout:  c.cfg.APISIX.DefaultClusterClientTimeout.Duration,
+       }
+       err := c.apisix.AddCluster(clusterOpts)
        if err != nil && err != apisix.ErrDuplicatedCluster {
-               // TODO give up the leader role.

Review comment:
       Don't remove the TODO, unless you really achieve this.

##########
File path: pkg/apisix/nonexistentclient.go
##########
@@ -20,6 +20,7 @@ import (
 
        "github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
        v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+       "k8s.io/apimachinery/pkg/util/wait"

Review comment:
       Ditto with the `import` convention.

##########
File path: pkg/ingress/controller.go
##########
@@ -317,30 +326,42 @@ election:
 }
 
 func (c *Controller) run(ctx context.Context) {
-       log.Infow("controller now is running as leader",
+       log.Infow("controller is start leading ...",
                zap.String("namespace", c.namespace),
                zap.String("pod", c.name),
        )
+
        defer c.leaderContextCancelFunc()
        c.metricsCollector.ResetLeader(true)
 
-       err := c.apisix.AddCluster(&apisix.ClusterOptions{
+       clusterOpts := &apisix.ClusterOptions{
                Name:     c.cfg.APISIX.DefaultClusterName,
                AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
                BaseURL:  c.cfg.APISIX.DefaultClusterBaseURL,
-       })
+               Timeout:  c.cfg.APISIX.DefaultClusterClientTimeout.Duration,
+       }
+       err := c.apisix.AddCluster(clusterOpts)
        if err != nil && err != apisix.ErrDuplicatedCluster {
-               // TODO give up the leader role.
                log.Errorf("failed to add default cluster: %s", err)
                return
        }
 
        if err := 
c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {
-               // TODO give up the leader role.

Review comment:
       Ditto.

##########
File path: pkg/ingress/controller.go
##########
@@ -180,40 +158,71 @@ func NewController(cfg *config.Config) (*Controller, 
error) {
                watchingNamespace: watchingNamespace,
                secretSSLMap:      new(sync.Map),
                recorder:          eventBroadcaster.NewRecorder(scheme.Scheme, 
v1.EventSource{Component: _component}),
-
-               epInformer:                  
kubeClient.SharedIndexInformerFactory.Core().V1().Endpoints().Informer(),
-               epLister:                    
kubeClient.SharedIndexInformerFactory.Core().V1().Endpoints().Lister(),
-               svcInformer:                 
kubeClient.SharedIndexInformerFactory.Core().V1().Services().Informer(),
-               svcLister:                   
kubeClient.SharedIndexInformerFactory.Core().V1().Services().Lister(),
-               ingressLister:               ingressLister,
-               ingressInformer:             ingressInformer,
-               secretInformer:              
kubeClient.SharedIndexInformerFactory.Core().V1().Secrets().Informer(),
-               secretLister:                
kubeClient.SharedIndexInformerFactory.Core().V1().Secrets().Lister(),
-               apisixRouteInformer:         apisixRouteInformer,
-               apisixRouteLister:           apisixRouteLister,
-               apisixUpstreamInformer:      
kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixUpstreams().Informer(),
-               apisixUpstreamLister:        
kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixUpstreams().Lister(),
-               apisixTlsInformer:           
kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixTlses().Informer(),
-               apisixTlsLister:             
kubeClient.APISIXSharedIndexInformerFactory.Apisix().V1().ApisixTlses().Lister(),
-               apisixClusterConfigInformer: 
kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Informer(),
-               apisixClusterConfigLister:   
kubeClient.APISIXSharedIndexInformerFactory.Apisix().V2alpha1().ApisixClusterConfigs().Lister(),
        }
+       return c, nil
+}
+
+func (c *Controller) initWhenStartLeader() {

Review comment:
       ```suggestion
   func (c *Controller) initWhenStartLeading() {
   ```

##########
File path: pkg/kube/apisix/apis/config/v2alpha1/types.go
##########
@@ -17,6 +17,7 @@ package v2alpha1
 import (
        "encoding/json"
 
+       "github.com/apache/apisix-ingress-controller/pkg/types"

Review comment:
       Put our own packages at the bottom of `import`.

##########
File path: pkg/ingress/controller.go
##########
@@ -423,3 +449,26 @@ func (c *Controller) syncSSL(ctx context.Context, ssl 
*apisixv1.Ssl, event types
        }
        return err
 }
+
+func (c *Controller) checkClusterHealthy(ctx context.Context) {

Review comment:
       ```suggestion
   func (c *Controller) checkClusterHealth(ctx context.Context) {
   ```

##########
File path: pkg/apisix/apisix.go
##########
@@ -50,6 +51,8 @@ type Cluster interface {
        String() string
        // HasSynced checks whether all resources in APISIX cluster is synced 
to cache.
        HasSynced(context.Context) error
+       // HealthCheck checks apisix cluster health in realtime.
+       HealthCheck(context.Context, wait.Backoff) error

Review comment:
       IMHO We shall not accept a `wait.Backoff` object from the caller. What 
we try to do is hide some details and just tell the caller whether the cluster 
is healthy or not.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to