tokers commented on a change in pull request #176:
URL:
https://github.com/apache/apisix-ingress-controller/pull/176#discussion_r559971926
##########
File path: pkg/apisix/cluster.go
##########
@@ -85,20 +89,129 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
ExpectContinueTimeout: o.Timeout,
},
},
+ cache: nil,
+ cacheReady: make(chan struct{}),
}
c.route = newRouteClient(c)
c.upstream = newUpstreamClient(c)
c.service = newServiceClient(c)
c.ssl = newSSLClient(c)
+ go c.warmingUp()
+
return c, nil
}
-// String exposes the client information in human readable format.
+func (c *cluster) warmingUp() {
+ log.Infow("warming up caching", zap.String("cluster", c.name))
+ now := time.Now()
+ defer log.Infow("caching warmed",
+ zap.String("cost_time", time.Now().Sub(now).String()),
+ zap.String("cluster", c.name),
+ )
+
+ backoff := wait.Backoff{
+ Duration: time.Second,
+ Factor: 2,
+ Steps: 6,
+ }
+ err := wait.ExponentialBackoff(backoff, c.warmingUpOnce)
+ if err != nil {
+ c.cacheWarmingUpErr = err
+ }
+ close(c.cacheReady)
+}
+
+func (c *cluster) warmingUpOnce() (bool, error) {
+ dbcache, err := cache.NewMemDBCache()
+ if err != nil {
+ return false, err
+ }
+ c.cache = dbcache
+
+ routes, err := c.route.List(context.TODO())
+ if err != nil {
+ log.Errorf("failed to list route in APISIX: %s", err)
+ return false, err
+ }
+ services, err := c.service.List(context.TODO())
+ if err != nil {
+ log.Errorf("failed to list services in APISIX: %s", err)
+ return false, err
+ }
+ upstreams, err := c.upstream.List(context.TODO())
+ if err != nil {
+ log.Errorf("failed to list upstreams in APISIX: %s", err)
+ return false, err
+ }
+ ssl, err := c.ssl.List(context.TODO())
+ if err != nil {
+ log.Errorf("failed to list ssl in APISIX: %s", err)
+ return false, err
+ }
+
+ for _, r := range routes {
+ if err := c.cache.InsertRoute(r); err != nil {
+ log.Errorw("failed to insert route to cache",
+ zap.String("route", *r.ID),
+ zap.String("cluster", c.name),
+ zap.String("error", err.Error()),
+ )
+ return false, err
+ }
+ }
+ for _, s := range services {
+ if err := c.cache.InsertService(s); err != nil {
+ log.Errorw("failed to insert service to cache",
+ zap.String("service", *s.ID),
+ zap.String("cluster", c.name),
+ zap.String("error", err.Error()),
+ )
+ return false, err
+ }
+ }
+ for _, u := range upstreams {
+ if err := c.cache.InsertUpstream(u); err != nil {
+ log.Errorw("failed to insert upstream to cache",
+ zap.String("upstream", *u.ID),
+ zap.String("cluster", c.name),
+ zap.String("error", err.Error()),
+ )
+ return false, err
+ }
+ }
+ for _, s := range ssl {
+ if err := c.cache.InsertSSL(s); err != nil {
+ log.Errorw("failed to insert ssl to cache",
+ zap.String("ssl", *s.ID),
+ zap.String("cluster", c.name),
+ zap.String("error", err.Error()),
+ )
+ return false, err
+ }
+ }
+ return true, nil
+}
+
+// String implements Cluster.String method.
func (c *cluster) String() string {
return fmt.Sprintf("name=%s; base_url=%s", c.name, c.baseURL)
}
+// Ready implements Cluster.Ready method.
+func (c *cluster) Ready(ctx context.Context) error {
+ if c.cacheWarmingUpErr != nil {
+ return c.cacheWarmingUpErr
+ }
+ select {
+ case <-ctx.Done():
+ log.Errorf("failed to wait cluster to ready: %s", ctx.Err())
+ return ctx.Err()
+ case <-c.cacheReady:
+ return nil
Review comment:
OK.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]