This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 67f3fd9  chore: endpointslice controller (#574)
67f3fd9 is described below

commit 67f3fd934b8a8b935440227a5c8ba7923ba91a2a
Author: Alex Zhang <[email protected]>
AuthorDate: Sat Jul 10 22:31:29 2021 +0800

    chore: endpointslice controller (#574)
---
 README.md                                        |   2 +-
 cmd/ingress/ingress.go                           |   1 +
 conf/config-default.yaml                         |   1 +
 docs/en/latest/FAQ.md                            |   2 +-
 pkg/ingress/controller.go                        | 103 ++++++++++-
 pkg/ingress/endpoint.go                          |  84 +--------
 pkg/ingress/endpointslice.go                     | 210 +++++++++++++++++++++++
 pkg/kube/endpoint.go                             |   9 +-
 pkg/kube/translation/translator_test.go          | 133 +++++++++++++-
 samples/deploy/rbac/apisix_view_clusterrole.yaml |   8 +
 test/e2e/scaffold/ingress.go                     |   9 +
 11 files changed, 470 insertions(+), 92 deletions(-)

diff --git a/README.md b/README.md
index 22b06db..df46172 100644
--- a/README.md
+++ b/README.md
@@ -58,7 +58,7 @@ This project is currently general availability.
 
 ## Prerequisites
 
-Apisix ingress controller requires Kubernetes version 1.14+.
+Apisix ingress controller requires Kubernetes version 1.15+.
 
 ## Apache APISIX Ingress vs. Kubernetes Ingress Nginx
 
diff --git a/cmd/ingress/ingress.go b/cmd/ingress/ingress.go
index df931ef..de22508 100644
--- a/cmd/ingress/ingress.go
+++ b/cmd/ingress/ingress.go
@@ -147,6 +147,7 @@ the apisix cluster and others are created`,
        cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ElectionID, 
"election-id", config.IngressAPISIXLeader, "election id used for campaign the 
controller leader")
        cmd.PersistentFlags().StringVar(&cfg.Kubernetes.IngressVersion, 
"ingress-version", config.IngressNetworkingV1, "the supported ingress api group 
version, can be \"networking/v1beta1\", \"networking/v1\" (for Kubernetes 
version v1.19.0 or higher) and \"extensions/v1beta1\"")
        cmd.PersistentFlags().StringVar(&cfg.Kubernetes.ApisixRouteVersion, 
"apisix-route-version", config.ApisixRouteV2alpha1, "the supported apisixroute 
api group version, can be \"apisix.apache.org/v1\" or 
\"apisix.apache.org/v2alpha1\"")
+       cmd.PersistentFlags().BoolVar(&cfg.Kubernetes.WatchEndpointSlices, 
"watch-endpointslices", false, "whether to watch endpointslices rather than 
endpoints")
        cmd.PersistentFlags().StringVar(&cfg.APISIX.BaseURL, "apisix-base-url", 
"", "the base URL for APISIX admin api / manager api (deprecated, using 
--default-apisix-cluster-base-url instead)")
        cmd.PersistentFlags().StringVar(&cfg.APISIX.AdminKey, 
"apisix-admin-key", "", "admin key used for the authorization of APISIX admin 
api / manager api (deprecated, using --default-apisix-cluster-admin-key 
instead)")
        cmd.PersistentFlags().StringVar(&cfg.APISIX.DefaultClusterBaseURL, 
"default-apisix-cluster-base-url", "", "the base URL of admin api / manager api 
for the default APISIX cluster")
diff --git a/conf/config-default.yaml b/conf/config-default.yaml
index 78c4f5d..b6ec625 100644
--- a/conf/config-default.yaml
+++ b/conf/config-default.yaml
@@ -50,6 +50,7 @@ kubernetes:
   ingress_version: "networking/v1"     # the supported ingress api group 
version, can be "networking/v1beta1"
                                        # , "networking/v1" (for Kubernetes 
version v1.19.0 or higher), and
                                        # "extensions/v1beta1", default is 
"networking/v1".
+  watch_endpointslices: false          # whether to watch EndpointSlices 
rather than Endpoints.
 
   apisix_route_version: "apisix.apache.org/v2alpha1" # the supported 
apisixroute api group version, can be
                                                      # "apisix.apache.org/v1" 
or "apisix.apache.org/v2alpha1",
diff --git a/docs/en/latest/FAQ.md b/docs/en/latest/FAQ.md
index 7c92f9a..8786def 100644
--- a/docs/en/latest/FAQ.md
+++ b/docs/en/latest/FAQ.md
@@ -49,7 +49,7 @@ Tips: The failure caused by empty upstream nodes is a 
limitation of Apache APISI
 
 6. What is the retry rule of `apisix-ingress-controller`?
 
-If an error occurs during the process of `apisix-ingress-controller` parsing 
CRD and distributing the configuration to APISIX, a retry will be triggered.
+If an error occurs duriREADME.mdng the process of `apisix-ingress-controller` 
parsing CRD and distributing the configuration to APISIX, a retry will be 
triggered.
 
 The delayed retry method is adopted. After the first failure, it is retried 
once per second. After 5 retries are triggered, the slow retry strategy will be 
enabled, and the retry will be performed every 1 minute until it succeeds.
 
diff --git a/pkg/ingress/controller.go b/pkg/ingress/controller.go
index 7694f0b..3c3d566 100644
--- a/pkg/ingress/controller.go
+++ b/pkg/ingress/controller.go
@@ -21,8 +21,11 @@ import (
        "sync"
        "time"
 
+       apisixcache 
"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
+       configv1 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
        "go.uber.org/zap"
        v1 "k8s.io/api/core/v1"
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
        "k8s.io/apimachinery/pkg/runtime"
        utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -107,10 +110,11 @@ type Controller struct {
        apisixConsumerLister        listersv2alpha1.ApisixConsumerLister
 
        // resource controllers
-       podController       *podController
-       endpointsController *endpointsController
-       ingressController   *ingressController
-       secretController    *secretController
+       podController           *podController
+       endpointsController     *endpointsController
+       endpointSliceController *endpointSliceController
+       ingressController       *ingressController
+       secretController        *secretController
 
        apisixUpstreamController      *apisixUpstreamController
        apisixRouteController         *apisixRouteController
@@ -237,8 +241,12 @@ func (c *Controller) initWhenStartLeading() {
        c.apisixTlsInformer = 
apisixFactory.Apisix().V1().ApisixTlses().Informer()
        c.apisixConsumerInformer = 
apisixFactory.Apisix().V2alpha1().ApisixConsumers().Informer()
 
+       if c.cfg.Kubernetes.WatchEndpointSlices {
+               c.endpointSliceController = c.newEndpointSliceController()
+       } else {
+               c.endpointsController = c.newEndpointsController()
+       }
        c.podController = c.newPodController()
-       c.endpointsController = c.newEndpointsController()
        c.apisixUpstreamController = c.newApisixUpstreamController()
        c.ingressController = c.newIngressController()
        c.apisixRouteController = c.newApisixRouteController()
@@ -429,7 +437,11 @@ func (c *Controller) run(ctx context.Context) {
                c.podController.run(ctx)
        })
        c.goAttach(func() {
-               c.endpointsController.run(ctx)
+               if c.cfg.Kubernetes.WatchEndpointSlices {
+                       c.endpointSliceController.run(ctx)
+               } else {
+                       c.endpointsController.run(ctx)
+               }
        })
        c.goAttach(func() {
                c.apisixUpstreamController.run(ctx)
@@ -508,6 +520,85 @@ func (c *Controller) syncConsumer(ctx context.Context, 
consumer *apisixv1.Consum
        }
        return
 }
+
+func (c *Controller) syncEndpoint(ctx context.Context, ep kube.Endpoint) error 
{
+       namespace := ep.Namespace()
+       svcName := ep.ServiceName()
+       svc, err := c.svcLister.Services(ep.Namespace()).Get(svcName)
+       if err != nil {
+               if k8serrors.IsNotFound(err) {
+                       log.Infof("service %s/%s not found", ep.Namespace(), 
svcName)
+                       return nil
+               }
+               log.Errorf("failed to get service %s/%s: %s", ep.Namespace(), 
svcName, err)
+               return err
+       }
+       var subsets []configv1.ApisixUpstreamSubset
+       subsets = append(subsets, configv1.ApisixUpstreamSubset{})
+       au, err := 
c.apisixUpstreamLister.ApisixUpstreams(namespace).Get(svcName)
+       if err != nil {
+               if !k8serrors.IsNotFound(err) {
+                       log.Errorf("failed to get ApisixUpstream %s/%s: %s", 
ep.Namespace(), svcName, err)
+                       return err
+               }
+       } else if len(au.Spec.Subsets) > 0 {
+               subsets = append(subsets, au.Spec.Subsets...)
+       }
+
+       clusters := c.apisix.ListClusters()
+       for _, port := range svc.Spec.Ports {
+               for _, subset := range subsets {
+                       nodes, err := c.translator.TranslateUpstreamNodes(ep, 
port.Port, subset.Labels)
+                       if err != nil {
+                               log.Errorw("failed to translate upstream nodes",
+                                       zap.Error(err),
+                                       zap.Any("endpoints", ep),
+                                       zap.Int32("port", port.Port),
+                               )
+                       }
+                       name := apisixv1.ComposeUpstreamName(namespace, 
svcName, subset.Name, port.Port)
+                       for _, cluster := range clusters {
+                               if err := 
c.syncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil {
+                                       return err
+                               }
+                       }
+               }
+       }
+       return nil
+}
+
+func (c *Controller) syncUpstreamNodesChangeToCluster(ctx context.Context, 
cluster apisix.Cluster, nodes apisixv1.UpstreamNodes, upsName string) error {
+       upstream, err := cluster.Upstream().Get(ctx, upsName)
+       if err != nil {
+               if err == apisixcache.ErrNotFound {
+                       log.Warnw("upstream is not referenced",
+                               zap.String("cluster", cluster.String()),
+                               zap.String("upstream", upsName),
+                       )
+                       return nil
+               } else {
+                       log.Errorw("failed to get upstream",
+                               zap.String("upstream", upsName),
+                               zap.String("cluster", cluster.String()),
+                               zap.Error(err),
+                       )
+                       return err
+               }
+       }
+
+       upstream.Nodes = nodes
+
+       log.Debugw("upstream binds new nodes",
+               zap.Any("upstream", upstream),
+               zap.String("cluster", cluster.String()),
+       )
+
+       updated := &manifest{
+               upstreams: []*apisixv1.Upstream{upstream},
+       }
+       return c.syncManifests(ctx, nil, updated, nil)
+}
+
 func (c *Controller) checkClusterHealth(ctx context.Context, cancelFunc 
context.CancelFunc) {
        defer cancelFunc()
        for {
diff --git a/pkg/ingress/endpoint.go b/pkg/ingress/endpoint.go
index fced1dd..c2eb236 100644
--- a/pkg/ingress/endpoint.go
+++ b/pkg/ingress/endpoint.go
@@ -16,21 +16,16 @@ package ingress
 
 import (
        "context"
-       "github.com/apache/apisix-ingress-controller/pkg/kube"
        "time"
 
        "go.uber.org/zap"
        corev1 "k8s.io/api/core/v1"
-       k8serrors "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/client-go/tools/cache"
        "k8s.io/client-go/util/workqueue"
 
-       "github.com/apache/apisix-ingress-controller/pkg/apisix"
-       apisixcache 
"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
-       configv1 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
+       "github.com/apache/apisix-ingress-controller/pkg/kube"
        "github.com/apache/apisix-ingress-controller/pkg/log"
        "github.com/apache/apisix-ingress-controller/pkg/types"
-       apisixv1 
"github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
 )
 
 type endpointsController struct {
@@ -89,82 +84,7 @@ func (c *endpointsController) run(ctx context.Context) {
 
 func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error 
{
        ep := ev.Object.(kube.Endpoint)
-       namespace := ep.Namespace()
-       svcName := ep.ServiceName()
-       svc, err := c.controller.svcLister.Services(ep.Namespace()).Get(svcName)
-       if err != nil {
-               if k8serrors.IsNotFound(err) {
-                       log.Infof("service %s/%s not found", ep.Namespace(), 
svcName)
-                       return nil
-               }
-               log.Errorf("failed to get service %s/%s: %s", ep.Namespace(), 
svcName, err)
-               return err
-       }
-       var subsets []configv1.ApisixUpstreamSubset
-       subsets = append(subsets, configv1.ApisixUpstreamSubset{})
-       au, err := 
c.controller.apisixUpstreamLister.ApisixUpstreams(namespace).Get(svcName)
-       if err != nil {
-               if !k8serrors.IsNotFound(err) {
-                       log.Errorf("failed to get ApisixUpstream %s/%s: %s", 
ep.Namespace(), svcName, err)
-                       return err
-               }
-       } else if len(au.Spec.Subsets) > 0 {
-               subsets = append(subsets, au.Spec.Subsets...)
-       }
-
-       clusters := c.controller.apisix.ListClusters()
-       for _, port := range svc.Spec.Ports {
-               for _, subset := range subsets {
-                       nodes, err := 
c.controller.translator.TranslateUpstreamNodes(ep, port.Port, subset.Labels)
-                       if err != nil {
-                               log.Errorw("failed to translate upstream nodes",
-                                       zap.Error(err),
-                                       zap.Any("endpoints", ep),
-                                       zap.Int32("port", port.Port),
-                               )
-                       }
-                       name := apisixv1.ComposeUpstreamName(namespace, 
svcName, subset.Name, port.Port)
-                       for _, cluster := range clusters {
-                               if err := c.syncToCluster(ctx, cluster, nodes, 
name); err != nil {
-                                       return err
-                               }
-                       }
-               }
-       }
-
-       return nil
-}
-
-func (c *endpointsController) syncToCluster(ctx context.Context, cluster 
apisix.Cluster, nodes apisixv1.UpstreamNodes, upsName string) error {
-       upstream, err := cluster.Upstream().Get(ctx, upsName)
-       if err != nil {
-               if err == apisixcache.ErrNotFound {
-                       log.Warnw("upstream is not referenced",
-                               zap.String("cluster", cluster.String()),
-                               zap.String("upstream", upsName),
-                       )
-                       return nil
-               } else {
-                       log.Errorw("failed to get upstream",
-                               zap.String("upstream", upsName),
-                               zap.String("cluster", cluster.String()),
-                               zap.Error(err),
-                       )
-                       return err
-               }
-       }
-
-       upstream.Nodes = nodes
-
-       log.Debugw("upstream binds new nodes",
-               zap.Any("upstream", upstream),
-               zap.String("cluster", cluster.String()),
-       )
-
-       updated := &manifest{
-               upstreams: []*apisixv1.Upstream{upstream},
-       }
-       return c.controller.syncManifests(ctx, nil, updated, nil)
+       return c.controller.syncEndpoint(ctx, ep)
 }
 
 func (c *endpointsController) handleSyncErr(obj interface{}, err error) {
diff --git a/pkg/ingress/endpointslice.go b/pkg/ingress/endpointslice.go
index ea5119c..c8eaa05 100644
--- a/pkg/ingress/endpointslice.go
+++ b/pkg/ingress/endpointslice.go
@@ -13,3 +13,213 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 package ingress
+
+import (
+       "context"
+       "time"
+
+       "go.uber.org/zap"
+       discoveryv1 "k8s.io/api/discovery/v1"
+       "k8s.io/client-go/tools/cache"
+       "k8s.io/client-go/util/workqueue"
+
+       "github.com/apache/apisix-ingress-controller/pkg/log"
+       "github.com/apache/apisix-ingress-controller/pkg/types"
+)
+
+const (
+       _endpointSlicesManagedBy = "endpointslice-controller.k8s.io"
+)
+
+type endpointSliceEvent struct {
+       Key         string
+       ServiceName string
+}
+
+type endpointSliceController struct {
+       controller *Controller
+       workqueue  workqueue.RateLimitingInterface
+       workers    int
+}
+
+func (c *Controller) newEndpointSliceController() *endpointSliceController {
+       ctl := &endpointSliceController{
+               controller: c,
+               workqueue:  
workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(time.Second,
 60*time.Second, 5), "endpointSlice"),
+               workers:    1,
+       }
+
+       ctl.controller.epInformer.AddEventHandler(
+               cache.ResourceEventHandlerFuncs{
+                       AddFunc:    ctl.onAdd,
+                       UpdateFunc: ctl.onUpdate,
+                       DeleteFunc: ctl.onDelete,
+               },
+       )
+
+       return ctl
+}
+
+func (c *endpointSliceController) run(ctx context.Context) {
+       log.Info("endpointSlice controller started")
+       defer log.Info("endpointSlice controller exited")
+       defer c.workqueue.ShutDown()
+
+       if ok := cache.WaitForCacheSync(ctx.Done(), 
c.controller.epInformer.HasSynced); !ok {
+               log.Error("informers sync failed")
+               return
+       }
+
+       handler := func() {
+               for {
+                       obj, shutdown := c.workqueue.Get()
+                       if shutdown {
+                               return
+                       }
+
+                       err := c.sync(ctx, obj.(*types.Event))
+                       c.workqueue.Done(obj)
+                       c.handleSyncErr(obj, err)
+               }
+       }
+
+       for i := 0; i < c.workers; i++ {
+               go handler()
+       }
+
+       <-ctx.Done()
+}
+
+func (c *endpointSliceController) sync(ctx context.Context, ev *types.Event) 
error {
+       epEvent := ev.Object.(endpointSliceEvent)
+       namespace, _, err := cache.SplitMetaNamespaceKey(epEvent.Key)
+       if err != nil {
+               log.Errorf("found endpointSlice object with bad namespace/name: 
%s, ignore it", epEvent.Key)
+               return nil
+       }
+       ep, err := c.controller.epLister.GetEndpointSlices(namespace, 
epEvent.ServiceName)
+       if err != nil {
+               log.Errorf("failed to get all endpointSlices for service %s: 
%s",
+                       epEvent.ServiceName, err)
+               return err
+       }
+       return c.controller.syncEndpoint(ctx, ep)
+}
+
+func (c *endpointSliceController) handleSyncErr(obj interface{}, err error) {
+       if err == nil {
+               c.workqueue.Forget(obj)
+               return
+       }
+       log.Warnw("sync endpointSlice failed, will retry",
+               zap.Any("object", obj),
+       )
+       c.workqueue.AddRateLimited(obj)
+}
+
+func (c *endpointSliceController) onAdd(obj interface{}) {
+       key, err := cache.MetaNamespaceKeyFunc(obj)
+       if err != nil {
+               log.Errorf("found endpointSlice object with bad namespace")
+       }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
+       ep := obj.(*discoveryv1.EndpointSlice)
+       svcName := ep.Labels[discoveryv1.LabelServiceName]
+       if svcName == "" {
+               return
+       }
+       if ep.Labels[discoveryv1.LabelManagedBy] != _endpointSlicesManagedBy {
+               // We only care about endpointSlice objects managed by the 
EndpointSlices
+               // controller.
+               return
+       }
+
+       log.Debugw("endpointSlice add event arrived",
+               zap.String("object-key", key),
+       )
+
+       c.workqueue.AddRateLimited(&types.Event{
+               Type: types.EventAdd,
+               Object: endpointSliceEvent{
+                       Key:         key,
+                       ServiceName: svcName,
+               },
+       })
+}
+
+func (c *endpointSliceController) onUpdate(prev, curr interface{}) {
+       prevEp := prev.(*discoveryv1.EndpointSlice)
+       currEp := curr.(*discoveryv1.EndpointSlice)
+
+       if prevEp.GetResourceVersion() == currEp.GetResourceVersion() {
+               return
+       }
+       key, err := cache.MetaNamespaceKeyFunc(currEp)
+       if err != nil {
+               log.Errorf("found endpointSlice object with bad namespace/name: 
%s, ignore it", err)
+               return
+       }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
+       if currEp.Labels[discoveryv1.LabelManagedBy] != 
_endpointSlicesManagedBy {
+               // We only care about endpointSlice objects managed by the 
EndpointSlices
+               // controller.
+               return
+       }
+       svcName := currEp.Labels[discoveryv1.LabelServiceName]
+       if svcName == "" {
+               return
+       }
+
+       log.Debugw("endpointSlice update event arrived",
+               zap.Any("new object", currEp),
+               zap.Any("old object", prevEp),
+       )
+       c.workqueue.AddRateLimited(&types.Event{
+               Type: types.EventUpdate,
+               // TODO pass key.
+               Object: endpointSliceEvent{
+                       Key:         key,
+                       ServiceName: svcName,
+               },
+       })
+}
+
+func (c *endpointSliceController) onDelete(obj interface{}) {
+       ep, ok := obj.(*discoveryv1.EndpointSlice)
+       if !ok {
+               tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
+               if !ok {
+                       log.Errorf("found endpoints: %+v in bad tombstone 
state", obj)
+                       return
+               }
+               ep = tombstone.Obj.(*discoveryv1.EndpointSlice)
+       }
+       key, err := cache.MetaNamespaceKeyFunc(obj)
+       if err != nil {
+               log.Errorf("found endpointSlice object with bad namespace/name: 
%s, ignore it", err)
+               return
+       }
+       if !c.controller.namespaceWatching(key) {
+               return
+       }
+       if ep.Labels[discoveryv1.LabelManagedBy] != _endpointSlicesManagedBy {
+               // We only care about endpointSlice objects managed by the 
EndpointSlices
+               // controller.
+               return
+       }
+       svcName := ep.Labels[discoveryv1.LabelServiceName]
+       log.Debugw("endpoints delete event arrived",
+               zap.Any("object-key", key),
+       )
+       c.workqueue.AddRateLimited(&types.Event{
+               Type: types.EventDelete,
+               Object: endpointSliceEvent{
+                       Key:         key,
+                       ServiceName: svcName,
+               },
+       })
+}
diff --git a/pkg/kube/endpoint.go b/pkg/kube/endpoint.go
index 27eba84..995a73c 100644
--- a/pkg/kube/endpoint.go
+++ b/pkg/kube/endpoint.go
@@ -59,7 +59,7 @@ func (lister *endpointLister) GetEndpoint(namespace, name 
string) (Endpoint, err
 }
 
 func (lister *endpointLister) GetEndpointSlices(namespace, svcName string) 
(Endpoint, error) {
-       if lister.epsLister != nil {
+       if lister.epsLister == nil {
                panic("not a endpointSlice lister")
        }
        selector := labels.SelectorFromSet(labels.Set{
@@ -174,3 +174,10 @@ func NewEndpoint(ep *corev1.Endpoints) Endpoint {
                endpoint: ep,
        }
 }
+
+// NewEndpointWithSlice creates an Endpoint which entity is Kubernetes 
EndpointSlices.
+func NewEndpointWithSlice(ep *discoveryv1.EndpointSlice) Endpoint {
+       return &endpoint{
+               endpointSlices: []*discoveryv1.EndpointSlice{ep},
+       }
+}
diff --git a/pkg/kube/translation/translator_test.go 
b/pkg/kube/translation/translator_test.go
index aa9bd70..f744403 100644
--- a/pkg/kube/translation/translator_test.go
+++ b/pkg/kube/translation/translator_test.go
@@ -16,9 +16,11 @@ package translation
 
 import (
        "context"
-       "github.com/apache/apisix-ingress-controller/pkg/kube"
        "testing"
 
+       "github.com/apache/apisix-ingress-controller/pkg/kube"
+       discoveryv1 "k8s.io/api/discovery/v1"
+
        apisixv1 
"github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
        "github.com/stretchr/testify/assert"
        corev1 "k8s.io/api/core/v1"
@@ -213,3 +215,132 @@ func TestTranslateUpstreamNodes(t *testing.T) {
                },
        })
 }
+
+func TestTranslateUpstreamNodesWithEndpointSlices(t *testing.T) {
+       svc := &corev1.Service{
+               TypeMeta: metav1.TypeMeta{},
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      "svc",
+                       Namespace: "test",
+               },
+               Spec: corev1.ServiceSpec{
+                       Ports: []corev1.ServicePort{
+                               {
+                                       Name: "port1",
+                                       Port: 80,
+                                       TargetPort: intstr.IntOrString{
+                                               Type:   intstr.Int,
+                                               IntVal: 9080,
+                                       },
+                               },
+                               {
+                                       Name: "port2",
+                                       Port: 443,
+                                       TargetPort: intstr.IntOrString{
+                                               Type:   intstr.Int,
+                                               IntVal: 9443,
+                                       },
+                               },
+                       },
+               },
+       }
+       isTrue := true
+       port1 := int32(9080)
+       port2 := int32(9443)
+       port1Name := "port1"
+       port2Name := "port2"
+       ep := &discoveryv1.EndpointSlice{
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      "svc",
+                       Namespace: "test",
+                       Labels: map[string]string{
+                               discoveryv1.LabelManagedBy:   
"endpointslice-controller.k8s.io",
+                               discoveryv1.LabelServiceName: "svc",
+                       },
+               },
+               AddressType: discoveryv1.AddressTypeIPv4,
+               Endpoints: []discoveryv1.Endpoint{
+                       {
+                               Addresses: []string{
+                                       "192.168.1.1",
+                                       "192.168.1.2",
+                               },
+                               Conditions: discoveryv1.EndpointConditions{
+                                       Ready: &isTrue,
+                               },
+                       },
+               },
+               Ports: []discoveryv1.EndpointPort{
+                       {
+                               Name: &port1Name,
+                               Port: &port1,
+                       },
+                       {
+                               Name: &port2Name,
+                               Port: &port2,
+                       },
+               },
+       }
+
+       client := fake.NewSimpleClientset()
+       informersFactory := informers.NewSharedInformerFactory(client, 0)
+       svcInformer := informersFactory.Core().V1().Services().Informer()
+       svcLister := informersFactory.Core().V1().Services().Lister()
+
+       processCh := make(chan struct{})
+       svcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+               AddFunc: func(obj interface{}) {
+                       processCh <- struct{}{}
+               },
+       })
+
+       stopCh := make(chan struct{})
+       defer close(stopCh)
+       go svcInformer.Run(stopCh)
+       cache.WaitForCacheSync(stopCh, svcInformer.HasSynced)
+
+       _, err := client.CoreV1().Services("test").Create(context.Background(), 
svc, metav1.CreateOptions{})
+       assert.Nil(t, err)
+
+       tr := &translator{&TranslatorOptions{
+               ServiceLister: svcLister,
+       }}
+       <-processCh
+
+       nodes, err := tr.TranslateUpstreamNodes(kube.NewEndpointWithSlice(ep), 
10080, nil)
+       assert.Nil(t, nodes)
+       assert.Equal(t, err, &translateError{
+               field:  "service.spec.ports",
+               reason: "port not defined",
+       })
+
+       nodes, err = tr.TranslateUpstreamNodes(kube.NewEndpointWithSlice(ep), 
80, nil)
+       assert.Nil(t, err)
+       assert.Equal(t, nodes, apisixv1.UpstreamNodes{
+               {
+                       Host:   "192.168.1.1",
+                       Port:   9080,
+                       Weight: 100,
+               },
+               {
+                       Host:   "192.168.1.2",
+                       Port:   9080,
+                       Weight: 100,
+               },
+       })
+
+       nodes, err = tr.TranslateUpstreamNodes(kube.NewEndpointWithSlice(ep), 
443, nil)
+       assert.Nil(t, err)
+       assert.Equal(t, nodes, apisixv1.UpstreamNodes{
+               {
+                       Host:   "192.168.1.1",
+                       Port:   9443,
+                       Weight: 100,
+               },
+               {
+                       Host:   "192.168.1.2",
+                       Port:   9443,
+                       Weight: 100,
+               },
+       })
+}
diff --git a/samples/deploy/rbac/apisix_view_clusterrole.yaml 
b/samples/deploy/rbac/apisix_view_clusterrole.yaml
index ef9d342..7a9ff16 100644
--- a/samples/deploy/rbac/apisix_view_clusterrole.yaml
+++ b/samples/deploy/rbac/apisix_view_clusterrole.yaml
@@ -157,3 +157,11 @@ rules:
   - leases
   verbs:
   - '*'
+- apiGroups:
+    - discovery.k8s.io
+  resources:
+    - endpointslices
+  verbs:
+    - get
+    - list
+    - watch
diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go
index 578907a..2b97082 100644
--- a/test/e2e/scaffold/ingress.go
+++ b/test/e2e/scaffold/ingress.go
@@ -168,6 +168,14 @@ rules:
     - leases
     verbs:
     - '*'
+  - apiGroups:
+    - discovery.k8s.io
+    resources:
+    - endpointslices
+    verbs:
+    - get
+    - list
+    - watch
 `
        _clusterRoleBinding = `
 apiVersion: rbac.authorization.k8s.io/v1
@@ -256,6 +264,7 @@ spec:
             - %s,kube-system
             - --apisix-route-version
             - %s
+            - --watch-endpointslices
       serviceAccount: ingress-apisix-e2e-test-service-account
 `
 )

Reply via email to