This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new b29dfc0  fix: add item to workqueue with delay when syncFailed (#103)
b29dfc0 is described below

commit b29dfc0a794d4747e2c4ab367eafd8a108eb2749
Author: kv <[email protected]>
AuthorDate: Thu Dec 17 18:06:31 2020 +0800

    fix: add item to workqueue with delay when syncFailed (#103)
    
    * fix: add item to workqueue with delay when syncFailed
    
    * fix: remove the dirty item from queueu when retry
    
    * add retry when tls sync failed
    
    * fix: add logs when retry
    
    * checking for transform error
    
    * fix: the logic about the resouces has been deleted when UPDATE
    
    * add warning logs for dirty data
---
 pkg/ingress/controller/apisix_route.go    | 49 +++++++++++------
 pkg/ingress/controller/apisix_service.go  | 87 +++++++++++++++++++++----------
 pkg/ingress/controller/apisix_tls.go      | 25 +++++++--
 pkg/ingress/controller/apisix_upstream.go |  3 +-
 4 files changed, 116 insertions(+), 48 deletions(-)

diff --git a/pkg/ingress/controller/apisix_route.go 
b/pkg/ingress/controller/apisix_route.go
index eed0825..66044c9 100644
--- a/pkg/ingress/controller/apisix_route.go
+++ b/pkg/ingress/controller/apisix_route.go
@@ -60,7 +60,7 @@ func BuildApisixRouteController(
                apisixRouteClientset: api6RouteClientset,
                apisixRouteList:      api6RouteInformer.Lister(),
                apisixRouteSynced:    api6RouteInformer.Informer().HasSynced,
-               workqueue:            
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), 
"ApisixRoutes"),
+               workqueue:            
workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second,
 60*time.Second, 5), "ApisixRoutes"),
        }
        api6RouteInformer.Informer().AddEventHandler(
                cache.ResourceEventHandlerFuncs{
@@ -85,7 +85,7 @@ func (c *ApisixRouteController) addFunc(obj interface{}) {
 func (c *ApisixRouteController) updateFunc(oldObj, newObj interface{}) {
        oldRoute := oldObj.(*api6V1.ApisixRoute)
        newRoute := newObj.(*api6V1.ApisixRoute)
-       if oldRoute.ResourceVersion == newRoute.ResourceVersion {
+       if oldRoute.ResourceVersion >= newRoute.ResourceVersion {
                return
        }
        //c.addFunc(newObj)
@@ -100,6 +100,17 @@ func (c *ApisixRouteController) updateFunc(oldObj, newObj 
interface{}) {
 }
 
 func (c *ApisixRouteController) deleteFunc(obj interface{}) {
+       oldRoute, ok := obj.(*api6V1.ApisixRoute)
+       if !ok {
+               oldState, ok := obj.(cache.DeletedFinalStateUnknown)
+               if !ok {
+                       return
+               }
+               oldRoute, ok = oldState.Obj.(*api6V1.ApisixRoute)
+               if !ok {
+                       return
+               }
+       }
        var key string
        var err error
        key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
@@ -107,7 +118,7 @@ func (c *ApisixRouteController) deleteFunc(obj interface{}) 
{
                runtime.HandleError(err)
                return
        }
-       rqo := &RouteQueueObj{Key: key, OldObj: nil, Ope: DELETE}
+       rqo := &RouteQueueObj{Key: key, OldObj: oldRoute, Ope: DELETE}
        c.workqueue.AddRateLimited(rqo)
 }
 
@@ -135,16 +146,16 @@ func (c *ApisixRouteController) processNextWorkItem() 
bool {
        }
        err := func(obj interface{}) error {
                defer c.workqueue.Done(obj)
-               var key string
                var ok bool
                var rqo *RouteQueueObj
                if rqo, ok = obj.(*RouteQueueObj); !ok {
                        c.workqueue.Forget(obj)
                        return fmt.Errorf("expected RouteQueueObj in workqueue 
but got %#v", obj)
                }
-               // 在syncHandler中处理业务
                if err := c.syncHandler(rqo); err != nil {
-                       return fmt.Errorf("error syncing '%s': %s", key, 
err.Error())
+                       c.workqueue.AddRateLimited(obj)
+                       log.Errorf("sync route %s failed", rqo.Key)
+                       return fmt.Errorf("error syncing '%s': %s", rqo.Key, 
err.Error())
                }
 
                c.workqueue.Forget(obj)
@@ -211,18 +222,16 @@ func (c *ApisixRouteController) sync(rqo *RouteQueueObj) 
error {
                log.Errorf("invalid resource key: %s", key)
                return fmt.Errorf("invalid resource key: %s", key)
        }
-
-       apisixIngressRoute, err := 
c.apisixRouteList.ApisixRoutes(namespace).Get(name)
-       if err != nil {
-               if errors.IsNotFound(err) {
-                       log.Infof("apisixRoute %s is removed", key)
-                       return nil
-               }
-               runtime.HandleError(fmt.Errorf("failed to list apisixRoute 
%s/%s", key, err.Error()))
-               return err
-       }
        switch {
        case rqo.Ope == UPDATE:
+               apisixIngressRoute, err := 
c.apisixRouteList.ApisixRoutes(namespace).Get(name)
+               if err != nil {
+                       if errors.IsNotFound(err) {
+                               log.Errorf("apisixRoute %s is removed", key)
+                               return nil
+                       }
+                       return err // if error occurred, return
+               }
                oldApisixRoute := apisix.ApisixRoute(*rqo.OldObj)
                oldRoutes, _, _, _ := oldApisixRoute.Convert()
 
@@ -232,10 +241,16 @@ func (c *ApisixRouteController) sync(rqo *RouteQueueObj) 
error {
                rc := &state.RouteCompare{OldRoutes: oldRoutes, NewRoutes: 
newRoutes}
                return rc.Sync()
        case rqo.Ope == DELETE:
-               apisixRoute := apisix.ApisixRoute(*apisixIngressRoute)
+               apisixIngressRoute, _ := 
c.apisixRouteList.ApisixRoutes(namespace).Get(name)
+               if apisixIngressRoute != nil && 
apisixIngressRoute.ResourceVersion > rqo.OldObj.ResourceVersion {
+                       log.Warnf("Route %s has been covered when retry", 
rqo.Key)
+                       return nil
+               }
+               apisixRoute := apisix.ApisixRoute(*rqo.OldObj)
                routes, _, _, _ := apisixRoute.Convert()
                rc := &state.RouteCompare{OldRoutes: routes, NewRoutes: nil}
                return rc.Sync()
+
        default:
                return fmt.Errorf("not expected in (ApisixRouteController) 
sync")
        }
diff --git a/pkg/ingress/controller/apisix_service.go 
b/pkg/ingress/controller/apisix_service.go
index e446736..a6b2005 100644
--- a/pkg/ingress/controller/apisix_service.go
+++ b/pkg/ingress/controller/apisix_service.go
@@ -55,7 +55,7 @@ func BuildApisixServiceController(
                apisixClientset:     apisixServiceClientset,
                apisixServiceList:   apisixServiceInformer.Lister(),
                apisixServiceSynced: apisixServiceInformer.Informer().HasSynced,
-               workqueue:           
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), 
"ApisixServices"),
+               workqueue:           
workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second,
 60*time.Second, 5), "ApisixServices"),
        }
        apisixServiceInformer.Informer().AddEventHandler(
                cache.ResourceEventHandlerFuncs{
@@ -66,6 +66,12 @@ func BuildApisixServiceController(
        return controller
 }
 
+type ServiceQueueObj struct {
+       Key    string                  `json:"key"`
+       OldObj *apisixV1.ApisixService `json:"old_obj"`
+       Ope    string                  `json:"ope"` // add / update / delete
+}
+
 func (c *ApisixServiceController) Run(stop <-chan struct{}) error {
        // 同步缓存
        if ok := cache.WaitForCacheSync(stop); !ok {
@@ -89,16 +95,16 @@ func (c *ApisixServiceController) processNextWorkItem() 
bool {
        }
        err := func(obj interface{}) error {
                defer c.workqueue.Done(obj)
-               var key string
+               var sqo *ServiceQueueObj
                var ok bool
-
-               if key, ok = obj.(string); !ok {
+               if sqo, ok = obj.(*ServiceQueueObj); !ok {
                        c.workqueue.Forget(obj)
-                       return fmt.Errorf("expected string in workqueue but got 
%#v", obj)
+                       return fmt.Errorf("expected ServiceQueueObj in 
workqueue but got %#v", obj)
                }
-               // 在syncHandler中处理业务
-               if err := c.syncHandler(key); err != nil {
-                       return fmt.Errorf("error syncing '%s': %s", key, 
err.Error())
+               if err := c.syncHandler(sqo); err != nil {
+                       c.workqueue.AddRateLimited(obj)
+                       log.Errorf("sync service %s failed", sqo.Key)
+                       return fmt.Errorf("error syncing '%s': %s", sqo.Key, 
err.Error())
                }
 
                c.workqueue.Forget(obj)
@@ -110,24 +116,30 @@ func (c *ApisixServiceController) processNextWorkItem() 
bool {
        return true
 }
 
-func (c *ApisixServiceController) syncHandler(key string) error {
-       namespace, name, err := cache.SplitMetaNamespaceKey(key)
+func (c *ApisixServiceController) syncHandler(sqo *ServiceQueueObj) error {
+       namespace, name, err := cache.SplitMetaNamespaceKey(sqo.Key)
        if err != nil {
-               log.Errorf("invalid resource key: %s", key)
-               return fmt.Errorf("invalid resource key: %s", key)
+               log.Errorf("invalid resource key: %s", sqo.Key)
+               return fmt.Errorf("invalid resource key: %s", sqo.Key)
        }
-
-       apisixServiceYaml, err := 
c.apisixServiceList.ApisixServices(namespace).Get(name)
-       if err != nil {
-               if errors.IsNotFound(err) {
-                       log.Infof("apisixUpstream %s is removed", key)
+       apisixServiceYaml := sqo.OldObj
+       if sqo.Ope == DELETE {
+               apisixIngressService, _ := 
c.apisixServiceList.ApisixServices(namespace).Get(name)
+               if apisixIngressService != nil && 
apisixIngressService.ResourceVersion > sqo.OldObj.ResourceVersion {
+                       log.Warnf("Service %s has been covered when retry", 
sqo.Key)
                        return nil
                }
-               runtime.HandleError(fmt.Errorf("failed to list apisixUpstream 
%s/%s", key, err.Error()))
-               return err
+       } else {
+               apisixServiceYaml, err = 
c.apisixServiceList.ApisixServices(namespace).Get(name)
+               if err != nil {
+                       if errors.IsNotFound(err) {
+                               log.Infof("apisixUpstream %s is removed", 
sqo.Key)
+                               return nil
+                       }
+                       runtime.HandleError(fmt.Errorf("failed to list 
apisixUpstream %s/%s", sqo.Key, err.Error()))
+                       return err
+               }
        }
-       log.Info(namespace)
-       log.Info(name)
        apisixService := apisix.ApisixServiceCRD(*apisixServiceYaml)
        services, upstreams, _ := apisixService.Convert()
        comb := state.ApisixCombination{Routes: nil, Services: services, 
Upstreams: upstreams}
@@ -142,19 +154,39 @@ func (c *ApisixServiceController) addFunc(obj 
interface{}) {
                runtime.HandleError(err)
                return
        }
-       c.workqueue.AddRateLimited(key)
+       sqo := &RouteQueueObj{Key: key, OldObj: nil, Ope: ADD}
+       c.workqueue.AddRateLimited(sqo)
 }
 
 func (c *ApisixServiceController) updateFunc(oldObj, newObj interface{}) {
-       oldRoute := oldObj.(*apisixV1.ApisixService)
-       newRoute := newObj.(*apisixV1.ApisixService)
-       if oldRoute.ResourceVersion == newRoute.ResourceVersion {
+       oldService := oldObj.(*apisixV1.ApisixService)
+       newService := newObj.(*apisixV1.ApisixService)
+       if oldService.ResourceVersion >= newService.ResourceVersion {
                return
        }
-       c.addFunc(newObj)
+       var key string
+       var err error
+       if key, err = cache.MetaNamespaceKeyFunc(newObj); err != nil {
+               runtime.HandleError(err)
+               return
+       }
+       sqo := &ServiceQueueObj{Key: key, OldObj: oldService, Ope: UPDATE}
+       c.workqueue.AddRateLimited(sqo)
 }
 
 func (c *ApisixServiceController) deleteFunc(obj interface{}) {
+       oldService, ok := obj.(*apisixV1.ApisixService)
+       if !ok {
+               oldState, ok := obj.(cache.DeletedFinalStateUnknown)
+               if !ok {
+                       return
+               }
+               oldService, ok = oldState.Obj.(*apisixV1.ApisixService)
+               if !ok {
+                       return
+               }
+       }
+
        var key string
        var err error
        key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
@@ -162,5 +194,6 @@ func (c *ApisixServiceController) deleteFunc(obj 
interface{}) {
                runtime.HandleError(err)
                return
        }
-       c.workqueue.AddRateLimited(key)
+       sqo := &ServiceQueueObj{Key: key, OldObj: oldService, Ope: DELETE}
+       c.workqueue.AddRateLimited(sqo)
 }
diff --git a/pkg/ingress/controller/apisix_tls.go 
b/pkg/ingress/controller/apisix_tls.go
index fe2a144..17f0405 100644
--- a/pkg/ingress/controller/apisix_tls.go
+++ b/pkg/ingress/controller/apisix_tls.go
@@ -60,7 +60,7 @@ func BuildApisixTlsController(
                apisixClientset: apisixTlsClientset,
                apisixTlsList:   apisixTlsInformer.Lister(),
                apisixTlsSynced: apisixTlsInformer.Informer().HasSynced,
-               workqueue:       
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), 
"ApisixTlses"),
+               workqueue:       
workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second,
 60*time.Second, 5), "ApisixTlses"),
        }
        apisixTlsInformer.Informer().AddEventHandler(
                cache.ResourceEventHandlerFuncs{
@@ -102,6 +102,8 @@ func (c *ApisixTlsController) processNextWorkItem() bool {
                        return fmt.Errorf("expected TlsQueueObj in workqueue 
but got %#v", obj)
                }
                if err := c.syncHandler(tqo); err != nil {
+                       c.workqueue.AddRateLimited(tqo)
+                       log.Errorf("sync tls %s failed", tqo.Key)
                        return fmt.Errorf("error syncing '%s': %s", key, 
err.Error())
                }
 
@@ -121,7 +123,13 @@ func (c *ApisixTlsController) syncHandler(tqo 
*TlsQueueObj) error {
                return fmt.Errorf("invalid resource key: %s", tqo.Key)
        }
        apisixTlsYaml := tqo.OldObj
-       if tqo.Ope != state.Delete {
+       if tqo.Ope == state.Delete {
+               apisixIngressTls, _ := 
c.apisixTlsList.ApisixTlses(namespace).Get(name)
+               if apisixIngressTls != nil && apisixIngressTls.ResourceVersion 
> tqo.OldObj.ResourceVersion {
+                       log.Warnf("TLS %s has been covered when retry", tqo.Key)
+                       return nil
+               }
+       } else {
                apisixTlsYaml, err = 
c.apisixTlsList.ApisixTlses(namespace).Get(name)
                if err != nil {
                        if errors.IsNotFound(err) {
@@ -132,6 +140,7 @@ func (c *ApisixTlsController) syncHandler(tqo *TlsQueueObj) 
error {
                        return err
                }
        }
+
        apisixTls := apisix.ApisixTlsCRD(*apisixTlsYaml)
        sc := &apisix.SecretClient{}
        if tls, err := apisixTls.Convert(sc); err != nil {
@@ -173,7 +182,17 @@ func (c *ApisixTlsController) updateFunc(oldObj, newObj 
interface{}) {
 }
 
 func (c *ApisixTlsController) deleteFunc(obj interface{}) {
-       oldTls := obj.(cache.DeletedFinalStateUnknown).Obj.(*apisixV1.ApisixTls)
+       oldTls, ok := obj.(*apisixV1.ApisixTls)
+       if !ok {
+               oldState, ok := obj.(cache.DeletedFinalStateUnknown)
+               if !ok {
+                       return
+               }
+               oldTls, ok = oldState.Obj.(*apisixV1.ApisixTls)
+               if !ok {
+                       return
+               }
+       }
        var key string
        var err error
        key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
diff --git a/pkg/ingress/controller/apisix_upstream.go 
b/pkg/ingress/controller/apisix_upstream.go
index e9436be..092f63e 100644
--- a/pkg/ingress/controller/apisix_upstream.go
+++ b/pkg/ingress/controller/apisix_upstream.go
@@ -56,7 +56,7 @@ func BuildApisixUpstreamController(
                apisixClientset:      apisixUpstreamClientset,
                apisixUpstreamList:   apisixUpstreamInformer.Lister(),
                apisixUpstreamSynced: 
apisixUpstreamInformer.Informer().HasSynced,
-               workqueue:            
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), 
"ApisixUpstreams"),
+               workqueue:            
workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second,
 60*time.Second, 5), "ApisixUpstreams"),
        }
        apisixUpstreamInformer.Informer().AddEventHandler(
                cache.ResourceEventHandlerFuncs{
@@ -99,6 +99,7 @@ func (c *ApisixUpstreamController) processNextWorkItem() bool 
{
                }
                // 在syncHandler中处理业务
                if err := c.syncHandler(key); err != nil {
+                       c.workqueue.AddRateLimited(obj)
                        return fmt.Errorf("error syncing '%s': %s", key, 
err.Error())
                }
 

Reply via email to