This is an automated email from the ASF dual-hosted git repository.

kvn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new a89be23  feat: subset translation (#497)
a89be23 is described below

commit a89be230989ea62d03062181626cc197df655a78
Author: Alex Zhang <[email protected]>
AuthorDate: Mon May 31 10:59:26 2021 +0800

    feat: subset translation (#497)
---
 pkg/kube/apisix/apis/config/v2alpha1/types.go |  6 +++++
 pkg/kube/translation/apisix_route.go          |  9 +++----
 pkg/kube/translation/ingress.go               |  4 +--
 pkg/kube/translation/plugin.go                |  2 +-
 pkg/kube/translation/translator.go            | 31 +++++++++++++++++++++---
 pkg/kube/translation/util.go                  | 35 +++++++++++++++++++++++++--
 samples/deploy/crd/v1beta1/ApisixRoute.yaml   |  6 +++++
 7 files changed, 79 insertions(+), 14 deletions(-)

diff --git a/pkg/kube/apisix/apis/config/v2alpha1/types.go 
b/pkg/kube/apisix/apis/config/v2alpha1/types.go
index 1fdf482..c978e92 100644
--- a/pkg/kube/apisix/apis/config/v2alpha1/types.go
+++ b/pkg/kube/apisix/apis/config/v2alpha1/types.go
@@ -174,6 +174,9 @@ type ApisixRouteHTTPBackend struct {
        ResolveGranularity string `json:"resolveGranularity" 
yaml:"resolveGranularity"`
        // Weight of this backend.
        Weight *int `json:"weight" yaml:"weight"`
+       // Subset specifies a subset for the target Service. The subset should 
be pre-defined
+       // in ApisixUpstream about this service.
+       Subset string `json:"subset" yaml:"subset"`
 }
 
 // ApisixRouteHTTPPlugin represents an APISIX plugin.
@@ -232,6 +235,9 @@ type ApisixRouteTCPBackend struct {
        // wise, the service ClusterIP or ExternalIP will be used,
        // default is endpoints.
        ResolveGranularity string `json:"resolveGranularity" 
yaml:"resolveGranularity"`
+       // Subset specifies a subset for the target Service. The subset should 
be pre-defined
+       // in ApisixUpstream about this service.
+       Subset string `json:"subset" yaml:"subset"`
 }
 
 // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
diff --git a/pkg/kube/translation/apisix_route.go 
b/pkg/kube/translation/apisix_route.go
index dceda4f..2360d72 100644
--- a/pkg/kube/translation/apisix_route.go
+++ b/pkg/kube/translation/apisix_route.go
@@ -64,7 +64,7 @@ func (t *translator) TranslateRouteV1(ar 
*configv1.ApisixRoute) (*TranslateConte
                        route.UpstreamId = id.GenID(upstreamName)
 
                        if !ctx.checkUpstreamExist(upstreamName) {
-                               ups, err := t.TranslateUpstream(ar.Namespace, 
p.Backend.ServiceName, int32(p.Backend.ServicePort))
+                               ups, err := t.TranslateUpstream(ar.Namespace, 
p.Backend.ServiceName, "", int32(p.Backend.ServicePort))
                                if err != nil {
                                        return nil, err
                                }
@@ -181,7 +181,7 @@ func (t *translator) translateHTTPRoute(ctx 
*TranslateContext, ar *configv2alpha
                }
                ctx.addRoute(route)
                if !ctx.checkUpstreamExist(upstreamName) {
-                       ups, err := t.translateUpstream(ar.Namespace, 
backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
+                       ups, err := t.translateUpstream(ar.Namespace, 
backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, 
svcPort)
                        if err != nil {
                                return err
                        }
@@ -312,10 +312,7 @@ func (t *translator) translateTCPRoute(ctx 
*TranslateContext, ar *configv2alpha1
                name := apisixv1.ComposeStreamRouteName(ar.Namespace, ar.Name, 
part.Name)
                sr.ID = id.GenID(name)
                sr.ServerPort = part.Match.IngressPort
-               // TODO use upstream id to refer the upstream object.
-               // Currently, APISIX doesn't use upstream_id field in
-               // APISIX, so we have to embed the entire upstream.
-               ups, err := t.translateUpstream(ar.Namespace, 
backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
+               ups, err := t.translateUpstream(ar.Namespace, 
backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, 
svcPort)
                if err != nil {
                        return err
                }
diff --git a/pkg/kube/translation/ingress.go b/pkg/kube/translation/ingress.go
index ab0ea6f..5169b68 100644
--- a/pkg/kube/translation/ingress.go
+++ b/pkg/kube/translation/ingress.go
@@ -169,7 +169,7 @@ func (t *translator) 
translateUpstreamFromIngressV1(namespace string, backend *n
        } else {
                svcPort = backend.Port.Number
        }
-       ups, err := t.TranslateUpstream(namespace, backend.Name, svcPort)
+       ups, err := t.TranslateUpstream(namespace, backend.Name, "", svcPort)
        if err != nil {
                return nil, err
        }
@@ -260,7 +260,7 @@ func (t *translator) 
translateUpstreamFromIngressV1beta1(namespace string, svcNa
        } else {
                portNumber = svcPort.IntVal
        }
-       ups, err := t.TranslateUpstream(namespace, svcName, portNumber)
+       ups, err := t.TranslateUpstream(namespace, svcName, "", portNumber)
        if err != nil {
                return nil, err
        }
diff --git a/pkg/kube/translation/plugin.go b/pkg/kube/translation/plugin.go
index d58379d..3cc6490 100644
--- a/pkg/kube/translation/plugin.go
+++ b/pkg/kube/translation/plugin.go
@@ -38,7 +38,7 @@ func (t *translator) translateTrafficSplitPlugin(ctx 
*TranslateContext, ar *conf
                if err != nil {
                        return nil, err
                }
-               ups, err := t.translateUpstream(ar.Namespace, 
backend.ServiceName, backend.ResolveGranularity, svcClusterIP, svcPort)
+               ups, err := t.translateUpstream(ar.Namespace, 
backend.ServiceName, backend.Subset, backend.ResolveGranularity, svcClusterIP, 
svcPort)
                if err != nil {
                        return nil, err
                }
diff --git a/pkg/kube/translation/translator.go 
b/pkg/kube/translation/translator.go
index e15a6f7..aed3251 100644
--- a/pkg/kube/translation/translator.go
+++ b/pkg/kube/translation/translator.go
@@ -25,6 +25,7 @@ import (
        configv1 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
        configv2alpha1 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
        listersv1 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
+       "github.com/apache/apisix-ingress-controller/pkg/types"
        apisixv1 
"github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
 )
 
@@ -54,7 +55,11 @@ type Translator interface {
        // The returned Upstream doesn't have metadata info.
        // It doesn't assign any metadata fields, so it's caller's 
responsibility to decide
        // the metadata.
-       TranslateUpstream(string, string, int32) (*apisixv1.Upstream, error)
+       // Note the subset is used to filter the ultimate node list, only pods 
whose labels
+       // matching the subset labels (defined in ApisixUpstream) will be 
selected.
+       // When the subset is not found, the node list will be empty. When the 
subset is empty,
+       // all pods IP will be filled.
+       TranslateUpstream(string, string, string, int32) (*apisixv1.Upstream, 
error)
        // TranslateIngress composes a couple of APISIX Routes and upstreams 
according
        // to the given Ingress resource.
        TranslateIngress(kube.Ingress) (*TranslateContext, error)
@@ -77,6 +82,7 @@ type Translator interface {
 // TranslatorOptions contains options to help Translator
 // work well.
 type TranslatorOptions struct {
+       PodCache             types.PodCache
        PodLister            listerscorev1.PodLister
        EndpointsLister      listerscorev1.EndpointsLister
        ServiceLister        listerscorev1.ServiceLister
@@ -112,7 +118,7 @@ func (t *translator) TranslateUpstreamConfig(au 
*configv1.ApisixUpstreamConfig)
        return ups, nil
 }
 
-func (t *translator) TranslateUpstream(namespace, name string, port int32) 
(*apisixv1.Upstream, error) {
+func (t *translator) TranslateUpstream(namespace, name, subset string, port 
int32) (*apisixv1.Upstream, error) {
        endpoints, err := t.EndpointsLister.Endpoints(namespace).Get(name)
        if err != nil {
                return nil, &translateError{
@@ -128,7 +134,13 @@ func (t *translator) TranslateUpstream(namespace, name 
string, port int32) (*api
        au, err := t.ApisixUpstreamLister.ApisixUpstreams(namespace).Get(name)
        if err != nil {
                if k8serrors.IsNotFound(err) {
-                       ups.Nodes = nodes
+                       // If subset in ApisixRoute is not empty but the 
ApisixUpstream resouce not found,
+                       // just set an empty node list.
+                       if subset != "" {
+                               ups.Nodes = apisixv1.UpstreamNodes{}
+                       } else {
+                               ups.Nodes = nodes
+                       }
                        return ups, nil
                }
                return nil, &translateError{
@@ -136,6 +148,19 @@ func (t *translator) TranslateUpstream(namespace, name 
string, port int32) (*api
                        reason: err.Error(),
                }
        }
+
+       // Filter nodes by subset.
+       if subset != "" {
+               var labels types.Labels
+               for _, ss := range au.Spec.Subsets {
+                       if ss.Name == subset {
+                               labels = ss.Labels
+                               break
+                       }
+               }
+               nodes = t.filterNodesByLabels(nodes, labels, au.Namespace)
+       }
+
        upsCfg := &au.Spec.ApisixUpstreamConfig
        for _, pls := range au.Spec.PortLevelSettings {
                if pls.Port == port {
diff --git a/pkg/kube/translation/util.go b/pkg/kube/translation/util.go
index 16ced5a..3584205 100644
--- a/pkg/kube/translation/util.go
+++ b/pkg/kube/translation/util.go
@@ -24,6 +24,7 @@ import (
        "github.com/apache/apisix-ingress-controller/pkg/id"
        configv2alpha1 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v2alpha1"
        "github.com/apache/apisix-ingress-controller/pkg/log"
+       "github.com/apache/apisix-ingress-controller/pkg/types"
        apisixv1 
"github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
 )
 
@@ -109,8 +110,8 @@ loop:
        return svc.Spec.ClusterIP, svcPort, nil
 }
 
-func (t *translator) translateUpstream(namespace, svcName, 
svcResolveGranularity, svcClusterIP string, svcPort int32) (*apisixv1.Upstream, 
error) {
-       ups, err := t.TranslateUpstream(namespace, svcName, svcPort)
+func (t *translator) translateUpstream(namespace, svcName, subset, 
svcResolveGranularity, svcClusterIP string, svcPort int32) (*apisixv1.Upstream, 
error) {
+       ups, err := t.TranslateUpstream(namespace, svcName, subset, svcPort)
        if err != nil {
                return nil, err
        }
@@ -128,6 +129,36 @@ func (t *translator) translateUpstream(namespace, svcName, 
svcResolveGranularity
        return ups, nil
 }
 
+func (t *translator) filterNodesByLabels(nodes apisixv1.UpstreamNodes, labels 
types.Labels, namespace string) apisixv1.UpstreamNodes {
+       if labels == nil {
+               return nodes
+       }
+
+       var filteredNodes apisixv1.UpstreamNodes
+       for _, node := range nodes {
+               podName, err := t.PodCache.GetNameByIP(node.Host)
+               if err != nil {
+                       log.Errorw("failed to find pod name by ip, ignore it",
+                               zap.Error(err),
+                               zap.String("pod_ip", node.Host),
+                       )
+                       continue
+               }
+               pod, err := t.PodLister.Pods(namespace).Get(podName)
+               if err != nil {
+                       log.Errorw("failed to find pod, ignore it",
+                               zap.Error(err),
+                               zap.String("pod_name", podName),
+                       )
+                       continue
+               }
+               if labels.IsSubsetOf(pod.Labels) {
+                       filteredNodes = append(filteredNodes, node)
+               }
+       }
+       return filteredNodes
+}
+
 func validateRemoteAddrs(remoteAddrs []string) error {
        for _, addr := range remoteAddrs {
                if ip := net.ParseIP(addr); ip == nil {
diff --git a/samples/deploy/crd/v1beta1/ApisixRoute.yaml 
b/samples/deploy/crd/v1beta1/ApisixRoute.yaml
index 89f4e1a..974795e 100644
--- a/samples/deploy/crd/v1beta1/ApisixRoute.yaml
+++ b/samples/deploy/crd/v1beta1/ApisixRoute.yaml
@@ -166,6 +166,8 @@ spec:
                       weight:
                         type: integer
                         minimum: 0
+                      subset:
+                        type: string
                     required:
                       - serviceName
                       - servicePort
@@ -188,6 +190,8 @@ spec:
                         weight:
                           type: integer
                           minimum: 0
+                        subset:
+                          type: string
                     required:
                       - serviceName
                       - servicePort
@@ -238,6 +242,8 @@ spec:
                       resolveGranualrity:
                         type: string
                         enum: ["endpoint", "service"]
+                      subset:
+                        type: string
                     required:
                       - serviceName
                       - servicePort

Reply via email to