This is an automated email from the ASF dual-hosted git repository.

tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bcbc60  chore: refactor endpoints controller and fix the bug in 
endpoints deletion (#216)
8bcbc60 is described below

commit 8bcbc60a0fc392c735e7a0dd64c581c91729678f
Author: Alex Zhang <[email protected]>
AuthorDate: Fri Jan 29 16:37:16 2021 +0800

    chore: refactor endpoints controller and fix the bug in endpoints deletion 
(#216)
---
 pkg/ingress/controller/controller.go               |  19 ++-
 pkg/ingress/controller/endpoint.go                 | 140 ++++++++++++---------
 pkg/ingress/controller/{watch.go => types.go}      |   0
 .../controller/watch.go => types/event.go}         |  35 +++++-
 .../store/store.go => types/event_test.go}         |  23 +++-
 test/e2e/endpoints/endpoints.go                    |  28 +++++
 test/e2e/scaffold/httpbin.go                       |   8 ++
 7 files changed, 181 insertions(+), 72 deletions(-)

diff --git a/pkg/ingress/controller/controller.go 
b/pkg/ingress/controller/controller.go
index 4b86a60..fa2d8c0 100644
--- a/pkg/ingress/controller/controller.go
+++ b/pkg/ingress/controller/controller.go
@@ -26,6 +26,7 @@ import (
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/client-go/informers"
        "k8s.io/client-go/kubernetes"
+       listerscorev1 "k8s.io/client-go/listers/core/v1"
        "k8s.io/client-go/tools/cache"
        "k8s.io/client-go/tools/leaderelection"
        "k8s.io/client-go/tools/leaderelection/resourcelock"
@@ -64,6 +65,10 @@ type Controller struct {
        crdController      *Api6Controller
        crdInformerFactory externalversions.SharedInformerFactory
 
+       // informers and listers
+       epInformer cache.SharedIndexInformer
+       epLister   listerscorev1.EndpointsLister
+
        endpointsController *endpointsController
 }
 
@@ -112,9 +117,12 @@ func NewController(cfg *config.Config) (*Controller, 
error) {
                crdClientset:       crdClientset,
                crdInformerFactory: sharedInformerFactory,
                watchingNamespace:  watchingNamespace,
+
+               epInformer: 
kube.CoreSharedInformerFactory.Core().V1().Endpoints().Informer(),
+               epLister:   
kube.CoreSharedInformerFactory.Core().V1().Endpoints().Lister(),
        }
 
-       c.endpointsController = 
c.newEndpointsController(kube.CoreSharedInformerFactory)
+       c.endpointsController = c.newEndpointsController(c.epInformer, 
c.epLister)
        return c, nil
 }
 
@@ -219,6 +227,7 @@ func (c *Controller) run(ctx context.Context) {
                log.Errorf("failed to add default cluster: %s", err)
                return
        }
+
        if err := c.apisix.Cluster("").HasSynced(ctx); err != nil {
                // TODO give up the leader role.
                log.Errorf("failed to wait the default cluster to be ready: 
%s", err)
@@ -226,9 +235,11 @@ func (c *Controller) run(ctx context.Context) {
        }
 
        c.goAttach(func() {
-               if err := c.endpointsController.run(ctx); err != nil {
-                       log.Errorf("failed to run endpoints controller: %s", 
err.Error())
-               }
+               c.epInformer.Run(ctx.Done())
+       })
+
+       c.goAttach(func() {
+               c.endpointsController.run(ctx)
        })
 
        ac := &Api6Controller{
diff --git a/pkg/ingress/controller/endpoint.go 
b/pkg/ingress/controller/endpoint.go
index c7befd0..6b55294 100644
--- a/pkg/ingress/controller/endpoint.go
+++ b/pkg/ingress/controller/endpoint.go
@@ -16,13 +16,10 @@ package controller
 
 import (
        "context"
-       "errors"
        "fmt"
 
        "go.uber.org/zap"
        corev1 "k8s.io/api/core/v1"
-       k8serrors "k8s.io/apimachinery/pkg/api/errors"
-       "k8s.io/client-go/informers"
        listerscorev1 "k8s.io/client-go/listers/core/v1"
        "k8s.io/client-go/tools/cache"
        "k8s.io/client-go/util/workqueue"
@@ -31,11 +28,14 @@ import (
        apisixcache "github.com/api7/ingress-controller/pkg/apisix/cache"
        "github.com/api7/ingress-controller/pkg/log"
        "github.com/api7/ingress-controller/pkg/seven/state"
+       "github.com/api7/ingress-controller/pkg/types"
        apisixv1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
 )
 
 const (
        _defaultNodeWeight = 100
+       // maxRetries is the number of times an object will be retried before 
it is dropped out of the queue.
+       _maxRetries = 10
 )
 
 type endpointsController struct {
@@ -43,14 +43,16 @@ type endpointsController struct {
        informer   cache.SharedIndexInformer
        lister     listerscorev1.EndpointsLister
        workqueue  workqueue.RateLimitingInterface
+       workers    int
 }
 
-func (c *Controller) newEndpointsController(factory 
informers.SharedInformerFactory) *endpointsController {
+func (c *Controller) newEndpointsController(informer 
cache.SharedIndexInformer, lister listerscorev1.EndpointsLister) 
*endpointsController {
        ctl := &endpointsController{
                controller: c,
-               informer:   factory.Core().V1().Endpoints().Informer(),
-               lister:     factory.Core().V1().Endpoints().Lister(),
+               informer:   informer,
+               lister:     lister,
                workqueue:  
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), 
"endpoints"),
+               workers:    1,
        }
 
        ctl.informer.AddEventHandler(
@@ -64,71 +66,49 @@ func (c *Controller) newEndpointsController(factory 
informers.SharedInformerFact
        return ctl
 }
 
-func (c *endpointsController) run(ctx context.Context) error {
+func (c *endpointsController) run(ctx context.Context) {
        log.Info("endpoints controller started")
        defer log.Info("endpoints controller exited")
 
-       go func() {
-               c.informer.Run(ctx.Done())
-       }()
-
        if ok := cache.WaitForCacheSync(ctx.Done(), c.informer.HasSynced); !ok {
-               return errors.New("endpoints informers cache sync failed")
+               log.Error("informers sync failed")
+               return
        }
 
-       for {
-               obj, shutdown := c.workqueue.Get()
-               if shutdown {
-                       return nil
-               }
-
-               var (
-                       err error
-               )
-
-               key, ok := obj.(string)
-               if !ok {
-                       log.Errorf("found endpoints object with unexpected type 
%T, ignore it", obj)
-                       c.workqueue.Forget(obj)
-               } else {
-                       err = c.process(ctx, key)
-               }
-
-               c.workqueue.Done(obj)
+       handler := func() {
+               for {
+                       obj, shutdown := c.workqueue.Get()
+                       if shutdown {
+                               return
+                       }
 
-               if err != nil {
-                       log.Warnf("endpoints %s retried since %s", key, err)
-                       c.retry(obj)
+                       err := c.sync(ctx, obj.(*types.Event))
+                       c.workqueue.Done(obj)
+                       c.handleSyncErr(obj, err)
                }
        }
-}
 
-func (c *endpointsController) process(ctx context.Context, key string) error {
-       namespace, name, err := cache.SplitMetaNamespaceKey(key)
-       if err != nil {
-               log.Errorf("found endpoints objects with malformed 
namespace/name: %s, ignore it", err)
-               return nil
+       for i := 0; i < c.workers; i++ {
+               go handler()
        }
 
-       ep, err := c.lister.Endpoints(namespace).Get(name)
-       if err != nil {
-               if k8serrors.IsNotFound(err) {
-                       log.Warnf("endpoints %s was removed before it can be 
processed", key)
-                       return nil
-               }
-               log.Errorf("failed to get endpoints %s: %s", key, err)
-               return err
-       }
-       return c.sync(ctx, ep)
+       <-ctx.Done()
+       c.workqueue.ShutDown()
 }
 
-func (c *endpointsController) sync(ctx context.Context, ep *corev1.Endpoints) 
error {
+func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error 
{
+       ep := ev.Object.(*corev1.Endpoints)
        clusters := c.controller.apisix.ListClusters()
        for _, s := range ep.Subsets {
                for _, port := range s.Ports {
+                       // FIXME this is wrong, we should use the port name as 
the key.
                        upstream := fmt.Sprintf("%s_%s_%d", ep.Namespace, 
ep.Name, port.Port)
                        for _, cluster := range clusters {
-                               if err := c.syncToCluster(ctx, upstream, 
cluster, s.Addresses, int(port.Port)); err != nil {
+                               var addresses []corev1.EndpointAddress
+                               if ev.Type != types.EventDelete {
+                                       addresses = s.Addresses
+                               }
+                               if err := c.syncToCluster(ctx, upstream, 
cluster, addresses, int(port.Port)); err != nil {
                                        return err
                                }
                        }
@@ -186,8 +166,18 @@ func (c *endpointsController) syncToCluster(ctx 
context.Context, upstreamName st
        return nil
 }
 
-func (c *endpointsController) retry(obj interface{}) {
-       c.workqueue.AddRateLimited(obj)
+func (c *endpointsController) handleSyncErr(obj interface{}, err error) {
+       if err == nil {
+               c.workqueue.Forget(obj)
+               return
+       }
+       if c.workqueue.NumRequeues(obj) < _maxRetries {
+               log.Infof("sync endpoints %+v failed, will retry", obj)
+               c.workqueue.AddRateLimited(obj)
+       } else {
+               c.workqueue.Forget(obj)
+               log.Warnf("drop endpoints %+v out of the queue", obj)
+       }
 }
 
 func (c *endpointsController) onAdd(obj interface{}) {
@@ -199,7 +189,11 @@ func (c *endpointsController) onAdd(obj interface{}) {
        if !c.controller.namespaceWatching(key) {
                return
        }
-       c.workqueue.AddRateLimited(key)
+
+       c.workqueue.AddRateLimited(&types.Event{
+               Type:   types.EventAdd,
+               Object: obj,
+       })
 }
 
 func (c *endpointsController) onUpdate(prev, curr interface{}) {
@@ -209,17 +203,39 @@ func (c *endpointsController) onUpdate(prev, curr 
interface{}) {
        if prevEp.GetResourceVersion() == currEp.GetResourceVersion() {
                return
        }
-       c.onAdd(currEp)
-}
-
-func (c *endpointsController) onDelete(obj interface{}) {
-       key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
+       key, err := cache.MetaNamespaceKeyFunc(currEp)
        if err != nil {
-               log.Errorf("failed to find the final state before deletion: 
%s", err)
+               log.Errorf("found endpoints object with bad namespace/name: %s, 
ignore it", err)
                return
        }
        if !c.controller.namespaceWatching(key) {
                return
        }
-       c.workqueue.AddRateLimited(key)
+       c.workqueue.AddRateLimited(&types.Event{
+               Type:   types.EventUpdate,
+               Object: curr,
+       })
+}
+
+func (c *endpointsController) onDelete(obj interface{}) {
+       ep, ok := obj.(*corev1.Endpoints)
+       if !ok {
+               tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
+               if !ok {
+                       log.Errorf("found endpoints: %+v in bad tombstone 
state", obj)
+                       return
+               }
+               ep = tombstone.Obj.(*corev1.Endpoints)
+       }
+
+       // FIXME Refactor Controller.namespaceWatching to just use
+       // namespace after all controllers use the same way to fetch
+       // the object.
+       if !c.controller.namespaceWatching(ep.Namespace + "/" + ep.Name) {
+               return
+       }
+       c.workqueue.AddRateLimited(&types.Event{
+               Type:   types.EventDelete,
+               Object: ep,
+       })
 }
diff --git a/pkg/ingress/controller/watch.go b/pkg/ingress/controller/types.go
similarity index 100%
copy from pkg/ingress/controller/watch.go
copy to pkg/ingress/controller/types.go
diff --git a/pkg/ingress/controller/watch.go b/pkg/types/event.go
similarity index 61%
rename from pkg/ingress/controller/watch.go
rename to pkg/types/event.go
index d857ef6..7535ba5 100644
--- a/pkg/ingress/controller/watch.go
+++ b/pkg/types/event.go
@@ -12,11 +12,36 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
-package controller
+
+package types
+
+// EventType is the type of event.
+type EventType int
 
 const (
-       ADD           = "ADD"
-       UPDATE        = "UPDATE"
-       DELETE        = "DELETE"
-       WatchFromKind = "watch"
+       // EventAdd means an add event.
+       EventAdd = iota + 1
+       // EventUpdate means an update event.
+       EventUpdate
+       // EventDelete means a delete event.
+       EventDelete
 )
+
+func (ev EventType) String() string {
+       switch ev {
+       case EventAdd:
+               return "add"
+       case EventUpdate:
+               return "update"
+       case EventDelete:
+               return "delete"
+       default:
+               return "unknown"
+       }
+}
+
+// Event represents a typed event.
+type Event struct {
+       Type   EventType
+       Object interface{}
+}
diff --git a/pkg/ingress/controller/store/store.go b/pkg/types/event_test.go
similarity index 68%
rename from pkg/ingress/controller/store/store.go
rename to pkg/types/event_test.go
index 6d7a394..cd009e7 100644
--- a/pkg/ingress/controller/store/store.go
+++ b/pkg/types/event_test.go
@@ -12,4 +12,25 @@
 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 // See the License for the specific language governing permissions and
 // limitations under the License.
-package store
+
+package types
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestEvent(t *testing.T) {
+       ev := Event{}
+       assert.Equal(t, ev.Type.String(), "unknown")
+
+       ev.Type = EventAdd
+       assert.Equal(t, ev.Type.String(), "add")
+
+       ev.Type = EventDelete
+       assert.Equal(t, ev.Type.String(), "delete")
+
+       ev.Type = EventUpdate
+       assert.Equal(t, ev.Type.String(), "update")
+}
diff --git a/test/e2e/endpoints/endpoints.go b/test/e2e/endpoints/endpoints.go
index 39921df..8ada318 100644
--- a/test/e2e/endpoints/endpoints.go
+++ b/test/e2e/endpoints/endpoints.go
@@ -16,6 +16,7 @@ package endpoints
 
 import (
        "fmt"
+       "net/http"
        "time"
 
        "github.com/api7/ingress-controller/test/e2e/scaffold"
@@ -43,4 +44,31 @@ spec:
                assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(ups))
                assert.Nil(ginkgo.GinkgoT(), 
s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams")
        })
+
+       ginkgo.It("upstream nodes should be reset to empty when 
Service/Endpoints was deleted", func() {
+               backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+               apisixRoute := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixRoute
+metadata:
+ name: httpbin-route
+spec:
+ rules:
+ - host: httpbin.com
+   http:
+     paths:
+     - backend:
+         serviceName: %s
+         servicePort: %d
+       path: /ip
+`, backendSvc, backendSvcPort[0])
+               assert.Nil(ginkgo.GinkgoT(), 
s.CreateResourceFromString(apisixRoute))
+               assert.Nil(ginkgo.GinkgoT(), 
s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams")
+               s.NewAPISIXClient().GET("/ip").WithHeader("Host", 
"httpbin.com").Expect().Status(http.StatusOK)
+
+               // Now delete the backend httpbin service resource.
+               assert.Nil(ginkgo.GinkgoT(), s.DeleteHTTPBINService())
+               time.Sleep(3 * time.Second)
+               s.NewAPISIXClient().GET("/ip").WithHeader("Host", 
"httpbin.com").Expect().Status(http.StatusBadGateway)
+       })
 })
diff --git a/test/e2e/scaffold/httpbin.go b/test/e2e/scaffold/httpbin.go
index eab3a1e..41701d6 100644
--- a/test/e2e/scaffold/httpbin.go
+++ b/test/e2e/scaffold/httpbin.go
@@ -116,6 +116,14 @@ func (s *Scaffold) ScaleHTTPBIN(desired int) error {
        return nil
 }
 
+// DeleteHTTPBINService deletes the HTTPBIN service object.
+func (s *Scaffold) DeleteHTTPBINService() error {
+       if err := k8s.KubectlDeleteFromStringE(s.t, s.kubectlOptions, 
_httpService); err != nil {
+               return err
+       }
+       return nil
+}
+
 // WaitAllHTTPBINPods waits until all httpbin pods ready.
 func (s *Scaffold) WaitAllHTTPBINPoddsAvailable() error {
        opts := metav1.ListOptions{

Reply via email to