gxthrj commented on a change in pull request #176:
URL: 
https://github.com/apache/apisix-ingress-controller/pull/176#discussion_r559965159



##########
File path: pkg/seven/state/builder.go
##########
@@ -227,64 +221,18 @@ func SolverSingleUpstream(u *v1.Upstream, swg 
ServiceWorkerGroup, wg *sync.WaitG
                                                needToUpdate = false
                                        }
                                }
-                               if needToUpdate {
-                                       // 1.sync memDB
-                                       upstreamDB := &db.UpstreamDB{Upstreams: 
[]*v1.Upstream{u}}
-                                       if err := upstreamDB.UpdateUpstreams(); 
err != nil {
-                                               log.Errorf("solver upstream 
failed, update upstream to local db failed, err: %s", err.Error())
-                                               errNotify = err
-                                               return
-                                       }
-
-                                       // 2.sync apisix
-                                       var cluster string
-                                       if u.Group != nil {
-                                               cluster = *u.Group
-                                       }
+                               if needToUpdate || (u.FromKind != nil && 
*u.FromKind == WatchFromKind) {

Review comment:
       Need to differentiate `service registration` and `CRD synchronization`, 
   When `FromKind == WatchFromKind`, the upstream info only contains nodes, and 
will lose other information in upstream, such as loadBalance infomation.

##########
File path: pkg/apisix/cluster.go
##########
@@ -85,20 +89,129 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
                                ExpectContinueTimeout: o.Timeout,
                        },
                },
+               cache:      nil,
+               cacheReady: make(chan struct{}),
        }
        c.route = newRouteClient(c)
        c.upstream = newUpstreamClient(c)
        c.service = newServiceClient(c)
        c.ssl = newSSLClient(c)
 
+       go c.warmingUp()
+
        return c, nil
 }
 
-// String exposes the client information in human readable format.
+func (c *cluster) warmingUp() {
+       log.Infow("warming up caching", zap.String("cluster", c.name))
+       now := time.Now()
+       defer log.Infow("caching warmed",
+               zap.String("cost_time", time.Now().Sub(now).String()),
+               zap.String("cluster", c.name),
+       )
+
+       backoff := wait.Backoff{
+               Duration: time.Second,
+               Factor:   2,
+               Steps:    6,
+       }
+       err := wait.ExponentialBackoff(backoff, c.warmingUpOnce)
+       if err != nil {
+               c.cacheWarmingUpErr = err
+       }
+       close(c.cacheReady)
+}
+
+func (c *cluster) warmingUpOnce() (bool, error) {
+       dbcache, err := cache.NewMemDBCache()
+       if err != nil {
+               return false, err
+       }
+       c.cache = dbcache
+
+       routes, err := c.route.List(context.TODO())
+       if err != nil {
+               log.Errorf("failed to list route in APISIX: %s", err)
+               return false, err
+       }
+       services, err := c.service.List(context.TODO())
+       if err != nil {
+               log.Errorf("failed to list services in APISIX: %s", err)
+               return false, err
+       }
+       upstreams, err := c.upstream.List(context.TODO())
+       if err != nil {
+               log.Errorf("failed to list upstreams in APISIX: %s", err)
+               return false, err
+       }
+       ssl, err := c.ssl.List(context.TODO())
+       if err != nil {
+               log.Errorf("failed to list ssl in APISIX: %s", err)
+               return false, err
+       }
+
+       for _, r := range routes {
+               if err := c.cache.InsertRoute(r); err != nil {
+                       log.Errorw("failed to insert route to cache",
+                               zap.String("route", *r.ID),
+                               zap.String("cluster", c.name),
+                               zap.String("error", err.Error()),
+                       )
+                       return false, err
+               }
+       }
+       for _, s := range services {
+               if err := c.cache.InsertService(s); err != nil {
+                       log.Errorw("failed to insert service to cache",
+                               zap.String("service", *s.ID),
+                               zap.String("cluster", c.name),
+                               zap.String("error", err.Error()),
+                       )
+                       return false, err
+               }
+       }
+       for _, u := range upstreams {
+               if err := c.cache.InsertUpstream(u); err != nil {
+                       log.Errorw("failed to insert upstream to cache",
+                               zap.String("upstream", *u.ID),
+                               zap.String("cluster", c.name),
+                               zap.String("error", err.Error()),
+                       )
+                       return false, err
+               }
+       }
+       for _, s := range ssl {
+               if err := c.cache.InsertSSL(s); err != nil {
+                       log.Errorw("failed to insert ssl to cache",
+                               zap.String("ssl", *s.ID),
+                               zap.String("cluster", c.name),
+                               zap.String("error", err.Error()),
+                       )
+                       return false, err
+               }
+       }
+       return true, nil
+}
+
+// String implements Cluster.String method.
 func (c *cluster) String() string {
        return fmt.Sprintf("name=%s; base_url=%s", c.name, c.baseURL)
 }
 
+// Ready implements Cluster.Ready method.
+func (c *cluster) Ready(ctx context.Context) error {
+       if c.cacheWarmingUpErr != nil {
+               return c.cacheWarmingUpErr
+       }
+       select {
+       case <-ctx.Done():
+               log.Errorf("failed to wait cluster to ready: %s", ctx.Err())
+               return ctx.Err()
+       case <-c.cacheReady:
+               return nil

Review comment:
       Had better to record log when ready .

##########
File path: pkg/apisix/route.go
##########
@@ -48,9 +48,17 @@ func newRouteClient(c *cluster) Route {
        }
 }
 
+// Get only looks up the cache, it's not necessary to access APISIX, since all 
resources
+// are created by Create, which reflects the change to cache in turn, so if 
resource
+// is not in cache, it's not in APISIX either.
+func (r *routeClient) Get(_ context.Context, fullname string) (*v1.Route, 
error) {
+       return r.cluster.cache.GetRoute(fullname)

Review comment:
       If cache return nil, need to read from admin api.
   Or we need a way to resync from APISIX to cache.

##########
File path: pkg/seven/state/solver.go
##########
@@ -57,92 +57,58 @@ func (s *ApisixCombination) Solver() (string, error) {
 func (s *ApisixCombination) Remove() error {
        // services
        for _, svc := range s.Services {
-               if err := RemoveService(svc); err != nil {
-                       return err
+               var cluster string
+               if svc.Group != nil {
+                       cluster = *svc.Group
                }
-       }
-
-       // upstreams
-       for _, up := range s.Upstreams {
-               if err := RemoveUpstream(up); err != nil {
-                       return err
-               }
-       }
-       return nil
-}
-
-func RemoveService(svc *v1.Service) error {
-       // find svc ID
-       serviceRequest := db.ServiceRequest{FullName: *svc.FullName}
-       if service, err := serviceRequest.FindByName(); err != nil {
-               // if not found, keep going on.
-               if errors.Is(err, utils.ErrNotFound) {
-                       return nil
-               }
-               return err
-       } else {
-               svc.ID = service.ID
-       }
-       // find ref route
-       routeRequest := db.RouteRequest{ServiceId: *svc.ID}
-       if route, err := routeRequest.ExistByServiceId(); err != nil {
-               if !errors.Is(err, utils.ErrNotFound) {
-                       // except ErrNotFound, need to retry
-                       return err
-               } else {
-                       // do delete svc
-                       var cluster string
-                       if svc.Group != nil {
-                               cluster = *svc.Group
-                       }
-                       if err := 
conf.Client.Cluster(cluster).Service().Delete(context.TODO(), svc); err != nil {
-                               log.Errorf("failed to delete svc %s from 
APISIX: %s", *svc.FullName, err)
+               svcInCache, err := 
conf.Client.Cluster(cluster).Service().Get(context.TODO(), *svc.FullName)
+               if err != nil {
+                       if err == cache.ErrNotFound {
+                               log.Errorf("failed to remove service %s: %s", 
*svc.FullName, err)
+                               continue
+                       } else {
                                return err
+                       }
+               }
+               paddingService(svc, svcInCache)
+               err = 
conf.Client.Cluster(cluster).Service().Delete(context.TODO(), svc)
+               if err != nil {
+                       if err == cache.ErrNotFound {
+                               log.Errorf("failed to remove service %s: %s", 
*svc.FullName, err)
+                       } else if err == cache.ErrStillInUse {
+                               log.Warnf("failed to remove service %s: %s", 
*svc.FullName, err)
                        } else {
-                               db := db.ServiceDB{Services: []*v1.Service{svc}}
-                               db.DeleteService()
-                               return nil
+                               return err
                        }
                }
-       } else {
-               return fmt.Errorf("svc %s is still referenced by route %s", 
*svc.FullName, *route.FullName)
        }
-}
 
-func RemoveUpstream(up *v1.Upstream) error {
-       upstreamRequest := db.UpstreamRequest{FullName: *up.FullName}
-       if upstream, err := upstreamRequest.FindByName(); err != nil {
-               // if not found, keep going on.
-               if errors.Is(err, utils.ErrNotFound) {
-                       return nil
+       // upstreams
+       for _, ups := range s.Upstreams {
+               var cluster string
+               if ups.Group != nil {
+                       cluster = *ups.Group
                }
-               return err
-       } else {
-               up.ID = upstream.ID
-       }
-       serviceRequest := db.ServiceRequest{UpstreamId: *up.ID}
-       if svc, err := serviceRequest.ExistByUpstreamId(); err != nil {

Review comment:
       Where is this logic in this refactor, need to check if upstream is refer 
by any service.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to