This is an automated email from the ASF dual-hosted git repository.
tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 8bcbc60 chore: refactor endpoints controller and fix the bug in
endpoints deletion (#216)
8bcbc60 is described below
commit 8bcbc60a0fc392c735e7a0dd64c581c91729678f
Author: Alex Zhang <[email protected]>
AuthorDate: Fri Jan 29 16:37:16 2021 +0800
chore: refactor endpoints controller and fix the bug in endpoints deletion
(#216)
---
pkg/ingress/controller/controller.go | 19 ++-
pkg/ingress/controller/endpoint.go | 140 ++++++++++++---------
pkg/ingress/controller/{watch.go => types.go} | 0
.../controller/watch.go => types/event.go} | 35 +++++-
.../store/store.go => types/event_test.go} | 23 +++-
test/e2e/endpoints/endpoints.go | 28 +++++
test/e2e/scaffold/httpbin.go | 8 ++
7 files changed, 181 insertions(+), 72 deletions(-)
diff --git a/pkg/ingress/controller/controller.go
b/pkg/ingress/controller/controller.go
index 4b86a60..fa2d8c0 100644
--- a/pkg/ingress/controller/controller.go
+++ b/pkg/ingress/controller/controller.go
@@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
+ listerscorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
@@ -64,6 +65,10 @@ type Controller struct {
crdController *Api6Controller
crdInformerFactory externalversions.SharedInformerFactory
+ // informers and listers
+ epInformer cache.SharedIndexInformer
+ epLister listerscorev1.EndpointsLister
+
endpointsController *endpointsController
}
@@ -112,9 +117,12 @@ func NewController(cfg *config.Config) (*Controller,
error) {
crdClientset: crdClientset,
crdInformerFactory: sharedInformerFactory,
watchingNamespace: watchingNamespace,
+
+ epInformer:
kube.CoreSharedInformerFactory.Core().V1().Endpoints().Informer(),
+ epLister:
kube.CoreSharedInformerFactory.Core().V1().Endpoints().Lister(),
}
- c.endpointsController =
c.newEndpointsController(kube.CoreSharedInformerFactory)
+ c.endpointsController = c.newEndpointsController(c.epInformer,
c.epLister)
return c, nil
}
@@ -219,6 +227,7 @@ func (c *Controller) run(ctx context.Context) {
log.Errorf("failed to add default cluster: %s", err)
return
}
+
if err := c.apisix.Cluster("").HasSynced(ctx); err != nil {
// TODO give up the leader role.
log.Errorf("failed to wait the default cluster to be ready:
%s", err)
@@ -226,9 +235,11 @@ func (c *Controller) run(ctx context.Context) {
}
c.goAttach(func() {
- if err := c.endpointsController.run(ctx); err != nil {
- log.Errorf("failed to run endpoints controller: %s",
err.Error())
- }
+ c.epInformer.Run(ctx.Done())
+ })
+
+ c.goAttach(func() {
+ c.endpointsController.run(ctx)
})
ac := &Api6Controller{
diff --git a/pkg/ingress/controller/endpoint.go
b/pkg/ingress/controller/endpoint.go
index c7befd0..6b55294 100644
--- a/pkg/ingress/controller/endpoint.go
+++ b/pkg/ingress/controller/endpoint.go
@@ -16,13 +16,10 @@ package controller
import (
"context"
- "errors"
"fmt"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
- k8serrors "k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/client-go/informers"
listerscorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
@@ -31,11 +28,14 @@ import (
apisixcache "github.com/api7/ingress-controller/pkg/apisix/cache"
"github.com/api7/ingress-controller/pkg/log"
"github.com/api7/ingress-controller/pkg/seven/state"
+ "github.com/api7/ingress-controller/pkg/types"
apisixv1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
)
const (
_defaultNodeWeight = 100
+ // maxRetries is the number of times an object will be retried before
it is dropped out of the queue.
+ _maxRetries = 10
)
type endpointsController struct {
@@ -43,14 +43,16 @@ type endpointsController struct {
informer cache.SharedIndexInformer
lister listerscorev1.EndpointsLister
workqueue workqueue.RateLimitingInterface
+ workers int
}
-func (c *Controller) newEndpointsController(factory
informers.SharedInformerFactory) *endpointsController {
+func (c *Controller) newEndpointsController(informer
cache.SharedIndexInformer, lister listerscorev1.EndpointsLister)
*endpointsController {
ctl := &endpointsController{
controller: c,
- informer: factory.Core().V1().Endpoints().Informer(),
- lister: factory.Core().V1().Endpoints().Lister(),
+ informer: informer,
+ lister: lister,
workqueue:
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
"endpoints"),
+ workers: 1,
}
ctl.informer.AddEventHandler(
@@ -64,71 +66,49 @@ func (c *Controller) newEndpointsController(factory
informers.SharedInformerFact
return ctl
}
-func (c *endpointsController) run(ctx context.Context) error {
+func (c *endpointsController) run(ctx context.Context) {
log.Info("endpoints controller started")
defer log.Info("endpoints controller exited")
- go func() {
- c.informer.Run(ctx.Done())
- }()
-
if ok := cache.WaitForCacheSync(ctx.Done(), c.informer.HasSynced); !ok {
- return errors.New("endpoints informers cache sync failed")
+ log.Error("informers sync failed")
+ return
}
- for {
- obj, shutdown := c.workqueue.Get()
- if shutdown {
- return nil
- }
-
- var (
- err error
- )
-
- key, ok := obj.(string)
- if !ok {
- log.Errorf("found endpoints object with unexpected type
%T, ignore it", obj)
- c.workqueue.Forget(obj)
- } else {
- err = c.process(ctx, key)
- }
-
- c.workqueue.Done(obj)
+ handler := func() {
+ for {
+ obj, shutdown := c.workqueue.Get()
+ if shutdown {
+ return
+ }
- if err != nil {
- log.Warnf("endpoints %s retried since %s", key, err)
- c.retry(obj)
+ err := c.sync(ctx, obj.(*types.Event))
+ c.workqueue.Done(obj)
+ c.handleSyncErr(obj, err)
}
}
-}
-func (c *endpointsController) process(ctx context.Context, key string) error {
- namespace, name, err := cache.SplitMetaNamespaceKey(key)
- if err != nil {
- log.Errorf("found endpoints objects with malformed
namespace/name: %s, ignore it", err)
- return nil
+ for i := 0; i < c.workers; i++ {
+ go handler()
}
- ep, err := c.lister.Endpoints(namespace).Get(name)
- if err != nil {
- if k8serrors.IsNotFound(err) {
- log.Warnf("endpoints %s was removed before it can be
processed", key)
- return nil
- }
- log.Errorf("failed to get endpoints %s: %s", key, err)
- return err
- }
- return c.sync(ctx, ep)
+ <-ctx.Done()
+ c.workqueue.ShutDown()
}
-func (c *endpointsController) sync(ctx context.Context, ep *corev1.Endpoints)
error {
+func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error
{
+ ep := ev.Object.(*corev1.Endpoints)
clusters := c.controller.apisix.ListClusters()
for _, s := range ep.Subsets {
for _, port := range s.Ports {
+ // FIXME this is wrong, we should use the port name as
the key.
upstream := fmt.Sprintf("%s_%s_%d", ep.Namespace,
ep.Name, port.Port)
for _, cluster := range clusters {
- if err := c.syncToCluster(ctx, upstream,
cluster, s.Addresses, int(port.Port)); err != nil {
+ var addresses []corev1.EndpointAddress
+ if ev.Type != types.EventDelete {
+ addresses = s.Addresses
+ }
+ if err := c.syncToCluster(ctx, upstream,
cluster, addresses, int(port.Port)); err != nil {
return err
}
}
@@ -186,8 +166,18 @@ func (c *endpointsController) syncToCluster(ctx
context.Context, upstreamName st
return nil
}
-func (c *endpointsController) retry(obj interface{}) {
- c.workqueue.AddRateLimited(obj)
+func (c *endpointsController) handleSyncErr(obj interface{}, err error) {
+ if err == nil {
+ c.workqueue.Forget(obj)
+ return
+ }
+ if c.workqueue.NumRequeues(obj) < _maxRetries {
+ log.Infof("sync endpoints %+v failed, will retry", obj)
+ c.workqueue.AddRateLimited(obj)
+ } else {
+ c.workqueue.Forget(obj)
+ log.Warnf("drop endpoints %+v out of the queue", obj)
+ }
}
func (c *endpointsController) onAdd(obj interface{}) {
@@ -199,7 +189,11 @@ func (c *endpointsController) onAdd(obj interface{}) {
if !c.controller.namespaceWatching(key) {
return
}
- c.workqueue.AddRateLimited(key)
+
+ c.workqueue.AddRateLimited(&types.Event{
+ Type: types.EventAdd,
+ Object: obj,
+ })
}
func (c *endpointsController) onUpdate(prev, curr interface{}) {
@@ -209,17 +203,39 @@ func (c *endpointsController) onUpdate(prev, curr
interface{}) {
if prevEp.GetResourceVersion() == currEp.GetResourceVersion() {
return
}
- c.onAdd(currEp)
-}
-
-func (c *endpointsController) onDelete(obj interface{}) {
- key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
+ key, err := cache.MetaNamespaceKeyFunc(currEp)
if err != nil {
- log.Errorf("failed to find the final state before deletion:
%s", err)
+ log.Errorf("found endpoints object with bad namespace/name: %s,
ignore it", err)
return
}
if !c.controller.namespaceWatching(key) {
return
}
- c.workqueue.AddRateLimited(key)
+ c.workqueue.AddRateLimited(&types.Event{
+ Type: types.EventUpdate,
+ Object: curr,
+ })
+}
+
+func (c *endpointsController) onDelete(obj interface{}) {
+ ep, ok := obj.(*corev1.Endpoints)
+ if !ok {
+ tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
+ if !ok {
+ log.Errorf("found endpoints: %+v in bad tombstone
state", obj)
+ return
+ }
+ ep = tombstone.Obj.(*corev1.Endpoints)
+ }
+
+ // FIXME Refactor Controller.namespaceWatching to just use
+ // namespace after all controllers use the same way to fetch
+ // the object.
+ if !c.controller.namespaceWatching(ep.Namespace + "/" + ep.Name) {
+ return
+ }
+ c.workqueue.AddRateLimited(&types.Event{
+ Type: types.EventDelete,
+ Object: ep,
+ })
}
diff --git a/pkg/ingress/controller/watch.go b/pkg/ingress/controller/types.go
similarity index 100%
copy from pkg/ingress/controller/watch.go
copy to pkg/ingress/controller/types.go
diff --git a/pkg/ingress/controller/watch.go b/pkg/types/event.go
similarity index 61%
rename from pkg/ingress/controller/watch.go
rename to pkg/types/event.go
index d857ef6..7535ba5 100644
--- a/pkg/ingress/controller/watch.go
+++ b/pkg/types/event.go
@@ -12,11 +12,36 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package controller
+
+package types
+
+// EventType is the type of event.
+type EventType int
const (
- ADD = "ADD"
- UPDATE = "UPDATE"
- DELETE = "DELETE"
- WatchFromKind = "watch"
+ // EventAdd means an add event.
+ EventAdd = iota + 1
+ // EventUpdate means an update event.
+ EventUpdate
+ // EventDelete means a delete event.
+ EventDelete
)
+
+func (ev EventType) String() string {
+ switch ev {
+ case EventAdd:
+ return "add"
+ case EventUpdate:
+ return "update"
+ case EventDelete:
+ return "delete"
+ default:
+ return "unknown"
+ }
+}
+
+// Event represents a typed event.
+type Event struct {
+ Type EventType
+ Object interface{}
+}
diff --git a/pkg/ingress/controller/store/store.go b/pkg/types/event_test.go
similarity index 68%
rename from pkg/ingress/controller/store/store.go
rename to pkg/types/event_test.go
index 6d7a394..cd009e7 100644
--- a/pkg/ingress/controller/store/store.go
+++ b/pkg/types/event_test.go
@@ -12,4 +12,25 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
-package store
+
+package types
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestEvent(t *testing.T) {
+ ev := Event{}
+ assert.Equal(t, ev.Type.String(), "unknown")
+
+ ev.Type = EventAdd
+ assert.Equal(t, ev.Type.String(), "add")
+
+ ev.Type = EventDelete
+ assert.Equal(t, ev.Type.String(), "delete")
+
+ ev.Type = EventUpdate
+ assert.Equal(t, ev.Type.String(), "update")
+}
diff --git a/test/e2e/endpoints/endpoints.go b/test/e2e/endpoints/endpoints.go
index 39921df..8ada318 100644
--- a/test/e2e/endpoints/endpoints.go
+++ b/test/e2e/endpoints/endpoints.go
@@ -16,6 +16,7 @@ package endpoints
import (
"fmt"
+ "net/http"
"time"
"github.com/api7/ingress-controller/test/e2e/scaffold"
@@ -43,4 +44,31 @@ spec:
assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(ups))
assert.Nil(ginkgo.GinkgoT(),
s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams")
})
+
+ ginkgo.It("upstream nodes should be reset to empty when
Service/Endpoints was deleted", func() {
+ backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+ apisixRoute := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixRoute
+metadata:
+ name: httpbin-route
+spec:
+ rules:
+ - host: httpbin.com
+ http:
+ paths:
+ - backend:
+ serviceName: %s
+ servicePort: %d
+ path: /ip
+`, backendSvc, backendSvcPort[0])
+ assert.Nil(ginkgo.GinkgoT(),
s.CreateResourceFromString(apisixRoute))
+ assert.Nil(ginkgo.GinkgoT(),
s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams")
+ s.NewAPISIXClient().GET("/ip").WithHeader("Host",
"httpbin.com").Expect().Status(http.StatusOK)
+
+ // Now delete the backend httpbin service resource.
+ assert.Nil(ginkgo.GinkgoT(), s.DeleteHTTPBINService())
+ time.Sleep(3 * time.Second)
+ s.NewAPISIXClient().GET("/ip").WithHeader("Host",
"httpbin.com").Expect().Status(http.StatusBadGateway)
+ })
})
diff --git a/test/e2e/scaffold/httpbin.go b/test/e2e/scaffold/httpbin.go
index eab3a1e..41701d6 100644
--- a/test/e2e/scaffold/httpbin.go
+++ b/test/e2e/scaffold/httpbin.go
@@ -116,6 +116,14 @@ func (s *Scaffold) ScaleHTTPBIN(desired int) error {
return nil
}
+// DeleteHTTPBINService deletes the HTTPBIN service object.
+func (s *Scaffold) DeleteHTTPBINService() error {
+ if err := k8s.KubectlDeleteFromStringE(s.t, s.kubectlOptions,
_httpService); err != nil {
+ return err
+ }
+ return nil
+}
+
// WaitAllHTTPBINPods waits until all httpbin pods ready.
func (s *Scaffold) WaitAllHTTPBINPoddsAvailable() error {
opts := metav1.ListOptions{