This is an automated email from the ASF dual-hosted git repository.
tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new 8ca5075 chore: new apisix upstream controller (#236)
8ca5075 is described below
commit 8ca50759480940120328bf99bc476e19d9599c39
Author: Alex Zhang <[email protected]>
AuthorDate: Sun Feb 7 13:57:02 2021 +0800
chore: new apisix upstream controller (#236)
---
pkg/apisix/resource.go | 4 +-
pkg/apisix/upstream_test.go | 2 +-
pkg/ingress/apisix/route.go | 25 +-
pkg/ingress/apisix/upstream.go | 114 --------
pkg/ingress/apisix/upstream_test.go | 111 -------
pkg/ingress/controller/apisix_route.go | 8 +-
pkg/ingress/controller/apisix_upstream.go | 320 ++++++++++++---------
pkg/ingress/controller/controller.go | 54 ++--
pkg/ingress/controller/endpoint.go | 72 ++---
pkg/ingress/endpoint/ep.go | 61 ----
pkg/kube/apisix/apis/config/v1/types.go | 41 ++-
.../apisix/apis/config/v1/zz_generated.deepcopy.go | 42 ++-
pkg/kube/translator.go | 204 +++++++++++++
pkg/kube/translator_test.go | 214 ++++++++++++++
pkg/types/apisix/v1/types.go | 50 +++-
pkg/types/apisix/v1/zz_generated.deepcopy.go | 34 +--
pkg/types/event.go | 7 +-
test/e2e/endpoints/endpoints.go | 70 ++++-
test/e2e/features/scheme.go | 12 +-
test/e2e/scaffold/scaffold.go | 2 +
20 files changed, 882 insertions(+), 565 deletions(-)
diff --git a/pkg/apisix/resource.go b/pkg/apisix/resource.go
index 19aa87d..cb0f3a3 100644
--- a/pkg/apisix/resource.go
+++ b/pkg/apisix/resource.go
@@ -126,9 +126,9 @@ func (i *item) upstream(clusterName string) (*v1.Upstream,
error) {
return nil, err
}
- var nodes []v1.Node
+ var nodes []v1.UpstreamNode
for _, node := range ups.Nodes {
- nodes = append(nodes, v1.Node{
+ nodes = append(nodes, v1.UpstreamNode{
IP: node.Host,
Port: node.Port,
Weight: node.Weight,
diff --git a/pkg/apisix/upstream_test.go b/pkg/apisix/upstream_test.go
index d287b59..9fa7045 100644
--- a/pkg/apisix/upstream_test.go
+++ b/pkg/apisix/upstream_test.go
@@ -165,7 +165,7 @@ func TestUpstreamClient(t *testing.T) {
ip := "10.0.11.153"
port := 15006
weight := 100
- nodes := []v1.Node{
+ nodes := []v1.UpstreamNode{
{
IP: ip,
Port: port,
diff --git a/pkg/ingress/apisix/route.go b/pkg/ingress/apisix/route.go
index 64d57d5..2487657 100644
--- a/pkg/ingress/apisix/route.go
+++ b/pkg/ingress/apisix/route.go
@@ -17,7 +17,7 @@ package apisix
import (
"strconv"
- "github.com/apache/apisix-ingress-controller/pkg/ingress/endpoint"
+ "github.com/apache/apisix-ingress-controller/pkg/kube"
configv1
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
"github.com/apache/apisix-ingress-controller/pkg/seven/conf"
apisix "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
@@ -37,7 +37,7 @@ const (
type ApisixRoute configv1.ApisixRoute
// Convert convert to apisix.Route from ingress.ApisixRoute CRD
-func (ar *ApisixRoute) Convert() ([]*apisix.Route, []*apisix.Service,
[]*apisix.Upstream, error) {
+func (ar *ApisixRoute) Convert(translator kube.Translator) ([]*apisix.Route,
[]*apisix.Service, []*apisix.Upstream, error) {
ns := ar.Namespace
// meta annotation
plugins, group := BuildAnnotation(ar.Annotations)
@@ -125,20 +125,15 @@ func (ar *ApisixRoute) Convert() ([]*apisix.Route,
[]*apisix.Service, []*apisix.
if group != "" {
fullUpstreamName = group + "_" +
apisixUpstreamName
}
- LBType := DefaultLBType
- port, _ := strconv.Atoi(svcPort)
- nodes := endpoint.BuildEps(ns, svcName, port)
- upstream := &apisix.Upstream{
- Metadata: apisix.Metadata{
- FullName: fullUpstreamName,
- Group: group,
- ResourceVersion: rv,
- Name: apisixUpstreamName,
- },
- Type: LBType,
- Nodes: nodes,
+ ups, err := translator.TranslateUpstream(ns, svcName,
int32(p.Backend.ServicePort))
+ if err != nil {
+ return nil, nil, nil, err
}
- upstreamMap[upstream.FullName] = upstream
+ ups.FullName = fullUpstreamName
+ ups.Group = group
+ ups.ResourceVersion = rv
+ ups.Name = apisixUpstreamName
+ upstreamMap[ups.FullName] = ups
}
}
for _, s := range serviceMap {
diff --git a/pkg/ingress/apisix/upstream.go b/pkg/ingress/apisix/upstream.go
deleted file mode 100644
index a5970e6..0000000
--- a/pkg/ingress/apisix/upstream.go
+++ /dev/null
@@ -1,114 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package apisix
-
-import (
- "errors"
- "fmt"
- "strconv"
-
- "github.com/apache/apisix-ingress-controller/pkg/ingress/endpoint"
- configv1
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
- "github.com/apache/apisix-ingress-controller/pkg/seven/conf"
- apisix "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
-)
-
-const (
- ApisixUpstream = "ApisixUpstream"
-)
-
-//type ApisixUpstreamCRD ingress.ApisixUpstream
-
-type ApisixUpstreamBuilder struct {
- CRD *configv1.ApisixUpstream
- Ep endpoint.Endpoint
-}
-
-// Convert convert to apisix.Route from ingress.ApisixRoute CRD
-func (aub *ApisixUpstreamBuilder) Convert() ([]*apisix.Upstream, error) {
- ar := aub.CRD
- ns := ar.Namespace
- name := ar.Name
- // meta annotation
- _, group := BuildAnnotation(ar.Annotations)
- conf.AddGroup(group)
-
- upstreams := make([]*apisix.Upstream, 0)
- rv := ar.ObjectMeta.ResourceVersion
- Ports := ar.Spec.Ports
- for _, r := range Ports {
- if r.Scheme != "" && r.Scheme != configv1.SchemeHTTP &&
r.Scheme != configv1.SchemeGRPC {
- return nil, fmt.Errorf("bad scheme %s", r.Scheme)
- }
-
- port := r.Port
- // apisix route name = namespace_svcName_svcPort = apisix
service name
- apisixUpstreamName := ns + "_" + name + "_" +
strconv.Itoa(int(port))
-
- lb := r.LoadBalancer
-
- //nodes := endpoint.BuildEps(ns, name, int(port))
- nodes := aub.Ep.BuildEps(ns, name, port)
- fromKind := ApisixUpstream
-
- // fullName
- fullName := apisixUpstreamName
- if group != "" {
- fullName = group + "_" + apisixUpstreamName
- }
- upstream := &apisix.Upstream{
- Metadata: apisix.Metadata{
- FullName: fullName,
- Group: group,
- ResourceVersion: rv,
- Name: apisixUpstreamName,
- },
- Nodes: nodes,
- FromKind: fromKind,
- }
- if r.Scheme != "" {
- upstream.Scheme = r.Scheme
- }
- if lb == nil || lb.Type == "" {
- upstream.Type = apisix.LbRoundRobin
- } else {
- switch lb.Type {
- case apisix.LbRoundRobin, apisix.LbLeastConn,
apisix.LbEwma:
- upstream.Type = lb.Type
- case apisix.LbConsistentHash:
- upstream.Type = lb.Type
- upstream.Key = lb.Key
- switch lb.HashOn {
- case apisix.HashOnVars:
- fallthrough
- case apisix.HashOnHeader:
- fallthrough
- case apisix.HashOnCookie:
- fallthrough
- case apisix.HashOnConsumer:
- fallthrough
- case apisix.HashOnVarsCombination:
- upstream.HashOn = lb.HashOn
- default:
- return nil, errors.New("invalid hashOn
value")
- }
- default:
- return nil, errors.New("invalid load balancer
type")
- }
- }
- upstreams = append(upstreams, upstream)
- }
- return upstreams, nil
-}
diff --git a/pkg/ingress/apisix/upstream_test.go
b/pkg/ingress/apisix/upstream_test.go
deleted file mode 100644
index 548d18c..0000000
--- a/pkg/ingress/apisix/upstream_test.go
+++ /dev/null
@@ -1,111 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package apisix
-
-import (
- "fmt"
- "testing"
-
- "github.com/stretchr/testify/assert"
- "gopkg.in/yaml.v2"
-
- configv1
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
- v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
-)
-
-func TestApisixUpstreamCRD_Convert(t *testing.T) {
- assert := assert.New(t)
-
- // get yaml from string
- var crd configv1.ApisixUpstream
- bytes := []byte(upstreamYaml)
- if err := yaml.Unmarshal(bytes, &crd); err != nil {
- assert.Error(err)
- } else {
- au3 := &ApisixUpstreamBuilder{CRD: &crd, Ep:
&EndpointRequestTest{}} // mock endpoints
- // convert
- if upstreams, err := au3.Convert(); err != nil {
- assert.Error(err)
- } else {
- // equals or deepCompare
- upstreamExpect := buildExpectUpstream()
- //upstreamsExpect := []*v1.Upstream{upstreamExpect}
- b := equals(upstreams[0], upstreamExpect)
- //b := reflect.DeepEqual(upstreams,
[]*v1.Upstream{upstreamExpect})
- if !b {
- assert.True(b, "convert upstream not expected")
- assert.Error(fmt.Errorf("convert upstream not
expect"))
- }
- t.Log("[upstream convert] ok")
- }
- }
-}
-
-func equals(s, d *v1.Upstream) bool {
- if s.Name != d.Name || s.FullName != d.FullName || s.Group != d.Group {
- return false
- }
-
- if s.FromKind != d.FromKind || s.Type != d.Type || s.Key != d.Key ||
s.HashOn != d.HashOn {
- return false
- }
-
- return true
-}
-
-// mock BuildEps
-type EndpointRequestTest struct{}
-
-func (epr *EndpointRequestTest) BuildEps(ns, name string, port int) []v1.Node {
- nodes := make([]v1.Node, 0)
- return nodes
-}
-
-func buildExpectUpstream() *v1.Upstream {
- fullName := "cloud_httpserver_8080"
- LBType := "chash"
- HashOn := "header"
- Key := "hello_key"
- fromKind := "ApisixUpstream"
- group := ""
- upstreamExpect := &v1.Upstream{
- Metadata: v1.Metadata{
- Group: group,
- ResourceVersion: group,
- FullName: fullName,
- Name: fullName,
- },
- Type: LBType,
- HashOn: HashOn,
- Key: Key,
- FromKind: fromKind,
- }
- return upstreamExpect
-}
-
-var upstreamYaml = `
-kind: ApisixUpstream
-apiVersion: apisix.apache.org/v1
-metadata:
- name: httpserver
- namespace: cloud
-spec:
- ports:
- - loadbalancer:
- hashOn: header
- key: hello_key
- type: chash
- port: 8080
-`
diff --git a/pkg/ingress/controller/apisix_route.go
b/pkg/ingress/controller/apisix_route.go
index fca49f7..f629d35 100644
--- a/pkg/ingress/controller/apisix_route.go
+++ b/pkg/ingress/controller/apisix_route.go
@@ -218,7 +218,7 @@ func (c *ApisixRouteController) add(key string) error {
return err
}
apisixRoute := apisix.ApisixRoute(*apisixIngressRoute)
- routes, services, upstreams, _ := apisixRoute.Convert()
+ routes, services, upstreams, _ :=
apisixRoute.Convert(c.controller.translator)
comb := state.ApisixCombination{Routes: routes, Services: services,
Upstreams: upstreams}
_, err = comb.Solver()
return err
@@ -246,10 +246,10 @@ func (c *ApisixRouteController) sync(rqo *RouteQueueObj)
error {
return err // if error occurred, return
}
oldApisixRoute := apisix.ApisixRoute(*rqo.OldObj)
- oldRoutes, _, _, _ := oldApisixRoute.Convert()
+ oldRoutes, _, _, _ :=
oldApisixRoute.Convert(c.controller.translator)
newApisixRoute := apisix.ApisixRoute(*apisixIngressRoute)
- newRoutes, _, _, _ := newApisixRoute.Convert()
+ newRoutes, _, _, _ :=
newApisixRoute.Convert(c.controller.translator)
rc := &state.RouteCompare{OldRoutes: oldRoutes, NewRoutes:
newRoutes}
return rc.Sync()
@@ -260,7 +260,7 @@ func (c *ApisixRouteController) sync(rqo *RouteQueueObj)
error {
return nil
}
apisixRoute := apisix.ApisixRoute(*rqo.OldObj)
- routes, services, upstreams, _ := apisixRoute.Convert()
+ routes, services, upstreams, _ :=
apisixRoute.Convert(c.controller.translator)
rc := &state.RouteCompare{OldRoutes: routes, NewRoutes: nil}
if err := rc.Sync(); err != nil {
return err
diff --git a/pkg/ingress/controller/apisix_upstream.go
b/pkg/ingress/controller/apisix_upstream.go
index 498919e..eed800a 100644
--- a/pkg/ingress/controller/apisix_upstream.go
+++ b/pkg/ingress/controller/apisix_upstream.go
@@ -15,202 +15,244 @@
package controller
import (
- "fmt"
- "time"
-
- "k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/kubernetes/scheme"
+ "context"
+
+ "go.uber.org/zap"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
- "github.com/apache/apisix-ingress-controller/pkg/ingress/apisix"
- "github.com/apache/apisix-ingress-controller/pkg/ingress/endpoint"
+ apisixcache
"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
configv1
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
- clientset
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned"
- apisixscheme
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned/scheme"
- informersv1
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/informers/externalversions/config/v1"
- listersv1
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
"github.com/apache/apisix-ingress-controller/pkg/log"
- "github.com/apache/apisix-ingress-controller/pkg/seven/state"
+ "github.com/apache/apisix-ingress-controller/pkg/types"
+ apisixv1
"github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
-type ApisixUpstreamController struct {
- controller *Controller
- kubeclientset kubernetes.Interface
- apisixClientset clientset.Interface
- apisixUpstreamList listersv1.ApisixUpstreamLister
- apisixUpstreamSynced cache.InformerSynced
- workqueue workqueue.RateLimitingInterface
+type apisixUpstreamController struct {
+ controller *Controller
+ workqueue workqueue.RateLimitingInterface
+ workers int
}
-func BuildApisixUpstreamController(
- kubeclientset kubernetes.Interface,
- apisixUpstreamClientset clientset.Interface,
- apisixUpstreamInformer informersv1.ApisixUpstreamInformer,
- root *Controller) *ApisixUpstreamController {
-
- runtime.Must(apisixscheme.AddToScheme(scheme.Scheme))
- controller := &ApisixUpstreamController{
- controller: root,
- kubeclientset: kubeclientset,
- apisixClientset: apisixUpstreamClientset,
- apisixUpstreamList: apisixUpstreamInformer.Lister(),
- apisixUpstreamSynced:
apisixUpstreamInformer.Informer().HasSynced,
- workqueue:
workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second,
60*time.Second, 5), "ApisixUpstreams"),
- }
- apisixUpstreamInformer.Informer().AddEventHandler(
+func (c *Controller) newApisixUpstreamController() *apisixUpstreamController {
+ ctl := &apisixUpstreamController{
+ controller: c,
+ workqueue:
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
"ApisixUpstream"),
+ workers: 1,
+ }
+
+ ctl.controller.apisixUpstreamInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
- AddFunc: controller.addFunc,
- UpdateFunc: controller.updateFunc,
- DeleteFunc: controller.deleteFunc,
- })
- return controller
+ AddFunc: ctl.onAdd,
+ UpdateFunc: ctl.onUpdate,
+ DeleteFunc: ctl.OnDelete,
+ },
+ )
+ return ctl
}
-func (c *ApisixUpstreamController) Run(stop <-chan struct{}) error {
- // 同步缓存
- if ok := cache.WaitForCacheSync(stop); !ok {
- log.Error("同步ApisixUpstream缓存失败")
- return fmt.Errorf("failed to wait for caches to sync")
+func (c *apisixUpstreamController) run(ctx context.Context) {
+ log.Info("ApisixUpstream controller started")
+ defer log.Info("ApisixUpstream controller exited")
+ if ok := cache.WaitForCacheSync(ctx.Done(),
c.controller.apisixUpstreamInformer.HasSynced,
c.controller.svcInformer.HasSynced); !ok {
+ log.Errorf("cache sync failed")
+ return
}
- go wait.Until(c.runWorker, time.Second, stop)
- return nil
+ for i := 0; i < c.workers; i++ {
+ go c.runWorker(ctx)
+ }
+
+ <-ctx.Done()
+ c.workqueue.ShutDown()
}
-func (c *ApisixUpstreamController) runWorker() {
- for c.processNextWorkItem() {
+func (c *apisixUpstreamController) runWorker(ctx context.Context) {
+ for {
+ obj, quit := c.workqueue.Get()
+ if quit {
+ return
+ }
+ err := c.sync(ctx, obj.(*types.Event))
+ c.workqueue.Done(obj)
+ c.handleSyncErr(obj, err)
}
}
-func (c *ApisixUpstreamController) processNextWorkItem() bool {
- defer recoverException()
- obj, shutdown := c.workqueue.Get()
- if shutdown {
- return false
+func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event)
error {
+ key := ev.Object.(string)
+ namespace, name, err := cache.SplitMetaNamespaceKey(key)
+ if err != nil {
+ log.Errorf("found ApisixUpstream resource with invalid meta
namespace key %s: %s", key, err)
+ return err
}
- err := func(obj interface{}) error {
- defer c.workqueue.Done(obj)
- var sqo *UpstreamQueueObj
- var ok bool
- if sqo, ok = obj.(*UpstreamQueueObj); !ok {
- c.workqueue.Forget(obj)
- return fmt.Errorf("expected string in workqueue but got
%#v", obj)
+ au, err :=
c.controller.apisixUpstreamLister.ApisixUpstreams(namespace).Get(name)
+ if err != nil {
+ if !k8serrors.IsNotFound(err) {
+ log.Errorf("failed to get ApisixUpstream %s: %s", key,
err)
+ return err
}
- // 在syncHandler中处理业务
- if err := c.syncHandler(sqo); err != nil {
- c.workqueue.AddRateLimited(obj)
- return fmt.Errorf("error syncing '%s': %s", sqo.Key,
err.Error())
+ if ev.Type != types.EventDelete {
+ log.Warnf("ApisixUpstream %s was deleted before it can
be delivered", key)
+ // Don't need retry.
+ return nil
}
+ }
+ if ev.Type == types.EventDelete {
+ if au != nil {
+ // We still find the resource while we are processing
the DELETE event,
+ // that means object with same namespace and name was
created, discarding
+ // this stale DELETE event.
+ log.Warnf("discard the stale ApisixUpstream DELETE
event since the %s exists", key)
+ return nil
+ }
+ au = ev.Tombstone.(*configv1.ApisixUpstream)
+ }
- c.workqueue.Forget(obj)
- return nil
- }(obj)
- if err != nil {
- runtime.HandleError(err)
+ var portLevelSettings map[int32]*configv1.ApisixUpstreamConfig
+ if len(au.Spec.PortLevelSettings) > 0 {
+ portLevelSettings =
make(map[int32]*configv1.ApisixUpstreamConfig, len(au.Spec.PortLevelSettings))
+ for _, port := range au.Spec.PortLevelSettings {
+ portLevelSettings[port.Port] =
&port.ApisixUpstreamConfig
+ }
}
- return true
-}
-func (c *ApisixUpstreamController) syncHandler(sqo *UpstreamQueueObj) error {
- namespace, name, err := cache.SplitMetaNamespaceKey(sqo.Key)
+ svc, err := c.controller.svcLister.Services(namespace).Get(name)
if err != nil {
- log.Errorf("invalid resource key: %s", sqo.Key)
- return fmt.Errorf("invalid resource key: %s", sqo.Key)
- }
- apisixUpstreamYaml := sqo.OldObj
- if sqo.Ope == DELETE {
- apisixIngressUpstream, _ :=
c.apisixUpstreamList.ApisixUpstreams(namespace).Get(name)
- if apisixIngressUpstream != nil &&
apisixIngressUpstream.ResourceVersion > sqo.OldObj.ResourceVersion {
- log.Warnf("Upstream %s has been covered when retry",
sqo.Key)
- return nil
- }
- } else {
- apisixUpstreamYaml, err =
c.apisixUpstreamList.ApisixUpstreams(namespace).Get(name)
+ log.Errorf("failed to get service %s: %s", key, err)
+ return err
+ }
+
+ for _, port := range svc.Spec.Ports {
+ upsName := apisixv1.ComposeUpstreamName(namespace, name,
port.Port)
+ // TODO: multiple cluster
+ ups, err := c.controller.apisix.Cluster("").Upstream().Get(ctx,
upsName)
if err != nil {
- if errors.IsNotFound(err) {
- log.Infof("apisixUpstream %s is removed",
sqo.Key)
- return nil
+ if err == apisixcache.ErrNotFound {
+ continue
}
- runtime.HandleError(fmt.Errorf("failed to list
apisixUpstream %s/%s", sqo.Key, err.Error()))
+ log.Errorf("failed to get upstream %s: %s", upsName,
err)
return err
}
- }
- aub := apisix.ApisixUpstreamBuilder{CRD: apisixUpstreamYaml, Ep:
&endpoint.EndpointRequest{}}
- upstreams, _ := aub.Convert()
- comb := state.ApisixCombination{Routes: nil, Services: nil, Upstreams:
upstreams}
- if sqo.Ope == DELETE {
- return comb.Remove()
- } else {
- _, err = comb.Solver()
- return err
- }
+ var newUps *apisixv1.Upstream
+ if ev.Type != types.EventDelete {
+ cfg, ok := portLevelSettings[port.Port]
+ if !ok {
+ cfg = &au.Spec.ApisixUpstreamConfig
+ }
+ // FIXME Same ApisixUpstreamConfig might be translated
multiple times.
+ newUps, err =
c.controller.translator.TranslateUpstreamConfig(cfg)
+ if err != nil {
+ log.Errorw("found malformed ApisixUpstream",
+ zap.Any("object", au),
+ zap.Error(err),
+ )
+ return err
+ }
+ } else {
+ newUps = apisixv1.NewDefaultUpstream()
+ }
+ newUps.Metadata = ups.Metadata
+ newUps.Nodes = ups.Nodes
+ log.Debugw("updating upstream since ApisixUpstream changed",
+ zap.String("event", ev.Type.String()),
+ zap.Any("upstream", newUps),
+ zap.Any("ApisixUpstream", au),
+ )
+ if _, err :=
c.controller.apisix.Cluster("").Upstream().Update(ctx, newUps); err != nil {
+ log.Errorw("failed to update upstream",
+ zap.Error(err),
+ zap.Any("upstream", newUps),
+ zap.Any("ApisixUpstream", au),
+ )
+ return err
+ }
+ }
+ return nil
}
-type UpstreamQueueObj struct {
- Key string `json:"key"`
- OldObj *configv1.ApisixUpstream `json:"old_obj"`
- Ope string `json:"ope"` // add / update / delete
+func (c *apisixUpstreamController) handleSyncErr(obj interface{}, err error) {
+ if err == nil {
+ c.workqueue.Forget(obj)
+ return
+ }
+ if c.workqueue.NumRequeues(obj) < _maxRetries {
+ log.Infow("sync endpoints failed, will retry",
+ zap.Any("object", obj),
+ )
+ c.workqueue.AddRateLimited(obj)
+ } else {
+ c.workqueue.Forget(obj)
+ log.Warnf("drop endpoints %+v out of the queue", obj)
+ }
}
-func (c *ApisixUpstreamController) addFunc(obj interface{}) {
- var key string
- var err error
- if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
- runtime.HandleError(err)
+func (c *apisixUpstreamController) onAdd(obj interface{}) {
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err != nil {
+ log.Errorf("found ApisixUpstream resource with bad meta
namesapce key: %s", err)
return
}
if !c.controller.namespaceWatching(key) {
return
}
- sqo := &UpstreamQueueObj{Key: key, OldObj: nil, Ope: ADD}
- c.workqueue.AddRateLimited(sqo)
+ log.Debugw("ApisixUpstream add event arrived",
+ zap.Any("object", obj))
+
+ c.workqueue.AddRateLimited(&types.Event{
+ Type: types.EventAdd,
+ Object: key,
+ })
}
-func (c *ApisixUpstreamController) updateFunc(oldObj, newObj interface{}) {
- oldUpstream := oldObj.(*configv1.ApisixUpstream)
- newUpstream := newObj.(*configv1.ApisixUpstream)
- if oldUpstream.ResourceVersion >= newUpstream.ResourceVersion {
+func (c *apisixUpstreamController) onUpdate(oldObj, newObj interface{}) {
+ prev := oldObj.(*configv1.ApisixUpstream)
+ curr := newObj.(*configv1.ApisixUpstream)
+ if prev.ResourceVersion >= curr.ResourceVersion {
return
}
- var (
- key string
- err error
- )
- if key, err = cache.MetaNamespaceKeyFunc(newObj); err != nil {
- runtime.HandleError(err)
+ key, err := cache.MetaNamespaceKeyFunc(newObj)
+ if err != nil {
+ log.Errorf("found ApisixUpstream resource with bad meta
namespace key: %s", err)
return
}
- sqo := &UpstreamQueueObj{Key: key, OldObj: oldUpstream, Ope: UPDATE}
- c.addFunc(sqo)
+ log.Debugw("ApisixUpstream update event arrived",
+ zap.Any("new object", curr),
+ zap.Any("old object", prev),
+ )
+
+ c.workqueue.AddRateLimited(&types.Event{
+ Type: types.EventUpdate,
+ Object: key,
+ })
}
-func (c *ApisixUpstreamController) deleteFunc(obj interface{}) {
- oldUpstream, ok := obj.(*configv1.ApisixUpstream)
+func (c *apisixUpstreamController) OnDelete(obj interface{}) {
+ au, ok := obj.(*configv1.ApisixUpstream)
if !ok {
- oldState, ok := obj.(cache.DeletedFinalStateUnknown)
- if !ok {
- return
- }
- oldUpstream, ok = oldState.Obj.(*configv1.ApisixUpstream)
+ tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
+ au = tombstone.Obj.(*configv1.ApisixUpstream)
}
- var key string
- var err error
- key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
+
+ key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
- runtime.HandleError(err)
+ log.Errorf("found ApisixUpstream resource with bad meta
namesapce key: %s", err)
return
}
if !c.controller.namespaceWatching(key) {
return
}
- sqo := &UpstreamQueueObj{Key: key, OldObj: oldUpstream, Ope: DELETE}
- c.workqueue.AddRateLimited(sqo)
+ log.Debugw("ApisixUpstream delete event arrived",
+ zap.Any("final state", au),
+ )
+ c.workqueue.AddRateLimited(&types.Event{
+ Type: types.EventDelete,
+ Object: key,
+ Tombstone: au,
+ })
}
diff --git a/pkg/ingress/controller/controller.go
b/pkg/ingress/controller/controller.go
index a08276d..6772637 100644
--- a/pkg/ingress/controller/controller.go
+++ b/pkg/ingress/controller/controller.go
@@ -20,6 +20,7 @@ import (
"sync"
"time"
+ listersv1
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -58,6 +59,7 @@ type Controller struct {
wg sync.WaitGroup
watchingNamespace map[string]struct{}
apisix apisix.APISIX
+ translator kube.Translator
apiServer *api.Server
clientset kubernetes.Interface
crdClientset crdclientset.Interface
@@ -65,11 +67,17 @@ type Controller struct {
crdController *Api6Controller
crdInformerFactory externalversions.SharedInformerFactory
- // informers and listers
- epInformer cache.SharedIndexInformer
- epLister listerscorev1.EndpointsLister
-
- endpointsController *endpointsController
+ // common informers and listers
+ epInformer cache.SharedIndexInformer
+ epLister listerscorev1.EndpointsLister
+ svcInformer cache.SharedIndexInformer
+ svcLister listerscorev1.ServiceLister
+ apisixUpstreamInformer cache.SharedIndexInformer
+ apisixUpstreamLister listersv1.ApisixUpstreamLister
+
+ // resource conrollers
+ endpointsController *endpointsController
+ apisixUpstreamController *apisixUpstreamController
}
// NewController creates an ingress apisix controller object.
@@ -118,11 +126,22 @@ func NewController(cfg *config.Config) (*Controller,
error) {
crdInformerFactory: sharedInformerFactory,
watchingNamespace: watchingNamespace,
- epInformer:
kube.CoreSharedInformerFactory.Core().V1().Endpoints().Informer(),
- epLister:
kube.CoreSharedInformerFactory.Core().V1().Endpoints().Lister(),
+ epInformer:
kube.CoreSharedInformerFactory.Core().V1().Endpoints().Informer(),
+ epLister:
kube.CoreSharedInformerFactory.Core().V1().Endpoints().Lister(),
+ svcInformer:
kube.CoreSharedInformerFactory.Core().V1().Services().Informer(),
+ svcLister:
kube.CoreSharedInformerFactory.Core().V1().Services().Lister(),
+ apisixUpstreamInformer:
sharedInformerFactory.Apisix().V1().ApisixUpstreams().Informer(),
+ apisixUpstreamLister:
sharedInformerFactory.Apisix().V1().ApisixUpstreams().Lister(),
}
+ c.translator = kube.NewTranslator(&kube.TranslatorOptions{
+ EndpointsLister: c.epLister,
+ ServiceLister: c.svcLister,
+ ApisixUpstreamLister: c.apisixUpstreamLister,
+ })
+
+ c.endpointsController = c.newEndpointsController()
+ c.apisixUpstreamController = c.newApisixUpstreamController()
- c.endpointsController = c.newEndpointsController(c.epInformer,
c.epLister)
return c, nil
}
@@ -237,10 +256,16 @@ func (c *Controller) run(ctx context.Context) {
c.goAttach(func() {
c.epInformer.Run(ctx.Done())
})
+ c.goAttach(func() {
+ c.svcInformer.Run(ctx.Done())
+ })
c.goAttach(func() {
c.endpointsController.run(ctx)
})
+ c.goAttach(func() {
+ c.apisixUpstreamController.run(ctx)
+ })
ac := &Api6Controller{
KubeClientSet: c.clientset,
@@ -252,8 +277,6 @@ func (c *Controller) run(ctx context.Context) {
// ApisixRoute
ac.ApisixRoute(c)
- // ApisixUpstream
- ac.ApisixUpstream(c)
// ApisixTLS
ac.ApisixTLS(c)
@@ -302,17 +325,6 @@ func (api6 *Api6Controller) ApisixRoute(controller
*Controller) {
}
}
-func (api6 *Api6Controller) ApisixUpstream(controller *Controller) {
- auc := BuildApisixUpstreamController(
- api6.KubeClientSet,
- api6.Api6ClientSet,
- api6.SharedInformerFactory.Apisix().V1().ApisixUpstreams(),
- controller)
- if err := auc.Run(api6.Stop); err != nil {
- log.Errorf("failed to run ApisixUpstreamController: %s", err)
- }
-}
-
func (api6 *Api6Controller) ApisixTLS(controller *Controller) {
atc := BuildApisixTlsController(
api6.KubeClientSet,
diff --git a/pkg/ingress/controller/endpoint.go
b/pkg/ingress/controller/endpoint.go
index c2937d6..1406b72 100644
--- a/pkg/ingress/controller/endpoint.go
+++ b/pkg/ingress/controller/endpoint.go
@@ -16,11 +16,10 @@ package controller
import (
"context"
- "fmt"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
- listerscorev1 "k8s.io/client-go/listers/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
@@ -33,29 +32,24 @@ import (
)
const (
- _defaultNodeWeight = 100
// maxRetries is the number of times an object will be retried before
it is dropped out of the queue.
_maxRetries = 10
)
type endpointsController struct {
controller *Controller
- informer cache.SharedIndexInformer
- lister listerscorev1.EndpointsLister
workqueue workqueue.RateLimitingInterface
workers int
}
-func (c *Controller) newEndpointsController(informer
cache.SharedIndexInformer, lister listerscorev1.EndpointsLister)
*endpointsController {
+func (c *Controller) newEndpointsController() *endpointsController {
ctl := &endpointsController{
controller: c,
- informer: informer,
- lister: lister,
workqueue:
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
"endpoints"),
workers: 1,
}
- ctl.informer.AddEventHandler(
+ ctl.controller.epInformer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: ctl.onAdd,
UpdateFunc: ctl.onUpdate,
@@ -70,7 +64,7 @@ func (c *endpointsController) run(ctx context.Context) {
log.Info("endpoints controller started")
defer log.Info("endpoints controller exited")
- if ok := cache.WaitForCacheSync(ctx.Done(), c.informer.HasSynced); !ok {
+ if ok := cache.WaitForCacheSync(ctx.Done(),
c.controller.epInformer.HasSynced); !ok {
log.Error("informers sync failed")
return
}
@@ -98,17 +92,39 @@ func (c *endpointsController) run(ctx context.Context) {
func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error
{
ep := ev.Object.(*corev1.Endpoints)
+ svc, err := c.controller.svcLister.Services(ep.Namespace).Get(ep.Name)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ log.Warnf("service %s/%s was deleted", ep.Namespace,
ep.Name)
+ return nil
+ }
+ log.Errorf("failed to get service %s/%s: %s", ep.Namespace,
ep.Name, err)
+ return err
+ }
+ portMap := make(map[string]int32)
+ for _, port := range svc.Spec.Ports {
+ portMap[port.Name] = port.Port
+ }
clusters := c.controller.apisix.ListClusters()
for _, s := range ep.Subsets {
for _, port := range s.Ports {
- // FIXME this is wrong, we should use the port name as
the key.
- upstream := fmt.Sprintf("%s_%s_%d", ep.Namespace,
ep.Name, port.Port)
+ svcPort, ok := portMap[port.Name]
+ if !ok {
+ // This shouldn't happen.
+ log.Errorf("port %s in endpoints %s/%s but not
in service", port.Name, ep.Namespace, ep.Name)
+ continue
+ }
+ nodes, err :=
c.controller.translator.TranslateUpstreamNodes(ep, svcPort)
+ if err != nil {
+ log.Errorw("failed to translate upstream nodes",
+ zap.Error(err),
+ zap.Any("endpoints", ep),
+ zap.Int32("port", svcPort),
+ )
+ }
+ name := apisixv1.ComposeUpstreamName(ep.Namespace,
ep.Name, svcPort)
for _, cluster := range clusters {
- var addresses []corev1.EndpointAddress
- if ev.Type != types.EventDelete {
- addresses = s.Addresses
- }
- if err := c.syncToCluster(ctx, upstream,
cluster, addresses, int(port.Port)); err != nil {
+ if err := c.syncToCluster(ctx, cluster, nodes,
name); err != nil {
return err
}
}
@@ -117,19 +133,18 @@ func (c *endpointsController) sync(ctx context.Context,
ev *types.Event) error {
return nil
}
-func (c *endpointsController) syncToCluster(ctx context.Context, upstreamName
string,
- cluster apisix.Cluster, addresses []corev1.EndpointAddress, port int)
error {
- upstream, err := cluster.Upstream().Get(ctx, upstreamName)
+func (c *endpointsController) syncToCluster(ctx context.Context, cluster
apisix.Cluster, nodes []apisixv1.UpstreamNode, upsName string) error {
+ upstream, err := cluster.Upstream().Get(ctx, upsName)
if err != nil {
if err == apisixcache.ErrNotFound {
log.Warnw("upstream is not referenced",
zap.String("cluster", cluster.String()),
- zap.String("upstream", upstreamName),
+ zap.String("upstream", upsName),
)
return nil
} else {
log.Errorw("failed to get upstream",
- zap.String("upstream", upstreamName),
+ zap.String("upstream", upsName),
zap.String("cluster", cluster.String()),
zap.Error(err),
)
@@ -137,17 +152,10 @@ func (c *endpointsController) syncToCluster(ctx
context.Context, upstreamName st
}
}
- nodes := make([]apisixv1.Node, 0, len(addresses))
- for _, address := range addresses {
- nodes = append(nodes, apisixv1.Node{
- IP: address.IP,
- Port: port,
- Weight: _defaultNodeWeight,
- })
- }
log.Debugw("upstream binds new nodes",
- zap.String("upstream", upstreamName),
+ zap.String("upstream", upsName),
zap.Any("nodes", nodes),
+ zap.String("cluster", cluster.String()),
)
upstream.Nodes = nodes
@@ -157,7 +165,7 @@ func (c *endpointsController) syncToCluster(ctx
context.Context, upstreamName st
if _, err = comb.Solver(); err != nil {
log.Errorw("failed to sync upstream",
- zap.String("upstream", upstreamName),
+ zap.String("upstream", upsName),
zap.String("cluster", cluster.String()),
zap.Error(err),
)
diff --git a/pkg/ingress/endpoint/ep.go b/pkg/ingress/endpoint/ep.go
deleted file mode 100644
index 0ebcebc..0000000
--- a/pkg/ingress/endpoint/ep.go
+++ /dev/null
@@ -1,61 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package endpoint
-
-import (
- "github.com/golang/glog"
-
- "github.com/apache/apisix-ingress-controller/pkg/kube"
- v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
-)
-
-type Endpoint interface {
- BuildEps(ns, name string, port int) []v1.Node
-}
-
-type EndpointRequest struct{}
-
-func (epr *EndpointRequest) BuildEps(ns, name string, port int) []v1.Node {
- nodes := make([]v1.Node, 0)
- epInformers := kube.EndpointsInformer
- if ep, err := epInformers.Lister().Endpoints(ns).Get(name); err != nil {
- glog.Errorf("find endpoint %s/%s err: %s", ns, name,
err.Error())
- } else {
- for _, s := range ep.Subsets {
- for _, ip := range s.Addresses {
- node := v1.Node{IP: ip.IP, Port: port, Weight:
100}
- nodes = append(nodes, node)
- }
- }
- }
- return nodes
-}
-
-// BuildEps build nodes from endpoints for upstream
-func BuildEps(ns, name string, port int) []v1.Node {
- nodes := make([]v1.Node, 0)
- epInformers := kube.EndpointsInformer
- if ep, err := epInformers.Lister().Endpoints(ns).Get(name); err != nil {
- glog.Errorf("find endpoint %s/%s err: %s", ns, name,
err.Error())
- } else {
- for _, s := range ep.Subsets {
- for _, ip := range s.Addresses {
- node := v1.Node{IP: ip.IP, Port: port, Weight:
100}
- nodes = append(nodes, node)
- }
- }
- }
- return nodes
-}
diff --git a/pkg/kube/apisix/apis/config/v1/types.go
b/pkg/kube/apisix/apis/config/v1/types.go
index ccc9fd0..b9648ea 100644
--- a/pkg/kube/apisix/apis/config/v1/types.go
+++ b/pkg/kube/apisix/apis/config/v1/types.go
@@ -74,35 +74,46 @@ type ApisixRouteList struct {
// +genclient:noStatus
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
-// ApisixUpstream is used to decorate Upstream in APISIX, such as load
-// balacing type.
+// ApisixUpstream is a decorator for Kubernetes Service, it arms the Service
+// with rich features like health check, retry policies, load balancer and
others.
+// It's designed to have same name with the Kubernetes Service and can be
customized
+// for individual port.
type ApisixUpstream struct {
metav1.TypeMeta `json:",inline" yaml:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty" yaml:"metadata,omitempty"`
- Spec *ApisixUpstreamSpec `json:"spec,omitempty"
yaml:"spec,omitempty"`
+
+ Spec *ApisixUpstreamSpec `json:"spec,omitempty" yaml:"spec,omitempty"`
}
-// ApisixUpstreamSpec describes the specification of Upstream in APISIX.
+// ApisixUpstreamSpec describes the specification of ApisixUpstream.
type ApisixUpstreamSpec struct {
- Ports []Port `json:"ports,omitempty"`
+ ApisixUpstreamConfig `json:",inline" yaml:",inline"`
+
+ PortLevelSettings []PortLevelSettings
`json:"portLevelSettings,omitempty" yaml:"portLevelSettings,omitempty"`
}
-// Port is the port-specific configurations.
-type Port struct {
- Port int `json:"port,omitempty"`
- LoadBalancer *LoadBalancer `json:"loadbalancer,omitempty"`
+// ApisixUpstreamConfig contains rich features on APISIX Upstream, for instance
+// load balancer, health check and etc.
+type ApisixUpstreamConfig struct {
+ // LoadBalancer represents the load balancer configuration for
Kubernetes Service.
+ // The default strategy is round robin.
+ // +optional
+ LoadBalancer *LoadBalancer `json:"loadbalancer,omitempty"
yaml:"loadbalancer,omitempty"`
// The scheme used to talk with the upstream.
// Now value can be http, grpc.
// +optional
Scheme string `json:"scheme,omitempty" yaml:"scheme,omitempty"`
}
-var (
- // SchemeHTTP represents the HTTP protocol.
- SchemeHTTP = "http"
- // SchemeGRPC represents the GRPC protocol.
- SchemeGRPC = "grpc"
-)
+// PortLevelSettings configures the ApisixUpstreamConfig for each individual
port. It inherits
+// configurations from the outer level (the whole Kubernetes Service) and
overrides some of
+// them if they are set on the port level.
+type PortLevelSettings struct {
+ ApisixUpstreamConfig `json:",inline" yaml:",inline"`
+
+ // Port is a Kubernetes Service port, it should be already defined.
+ Port int32 `json:"port" yaml:"port"`
+}
// LoadBalancer describes the load balancing parameters.
type LoadBalancer struct {
diff --git a/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go
b/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go
index 5ec8da8..82ccbb5 100644
--- a/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go
+++ b/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go
@@ -245,6 +245,27 @@ func (in *ApisixUpstream) DeepCopyObject() runtime.Object {
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
+func (in *ApisixUpstreamConfig) DeepCopyInto(out *ApisixUpstreamConfig) {
+ *out = *in
+ if in.LoadBalancer != nil {
+ in, out := &in.LoadBalancer, &out.LoadBalancer
+ *out = new(LoadBalancer)
+ **out = **in
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new ApisixUpstreamConfig.
+func (in *ApisixUpstreamConfig) DeepCopy() *ApisixUpstreamConfig {
+ if in == nil {
+ return nil
+ }
+ out := new(ApisixUpstreamConfig)
+ in.DeepCopyInto(out)
+ return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
func (in *ApisixUpstreamList) DeepCopyInto(out *ApisixUpstreamList) {
*out = *in
out.TypeMeta = in.TypeMeta
@@ -280,9 +301,10 @@ func (in *ApisixUpstreamList) DeepCopyObject()
runtime.Object {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
func (in *ApisixUpstreamSpec) DeepCopyInto(out *ApisixUpstreamSpec) {
*out = *in
- if in.Ports != nil {
- in, out := &in.Ports, &out.Ports
- *out = make([]Port, len(*in))
+ in.ApisixUpstreamConfig.DeepCopyInto(&out.ApisixUpstreamConfig)
+ if in.PortLevelSettings != nil {
+ in, out := &in.PortLevelSettings, &out.PortLevelSettings
+ *out = make([]PortLevelSettings, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
@@ -398,22 +420,18 @@ func (in *Plugin) DeepCopy() *Plugin {
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
-func (in *Port) DeepCopyInto(out *Port) {
+func (in *PortLevelSettings) DeepCopyInto(out *PortLevelSettings) {
*out = *in
- if in.LoadBalancer != nil {
- in, out := &in.LoadBalancer, &out.LoadBalancer
- *out = new(LoadBalancer)
- **out = **in
- }
+ in.ApisixUpstreamConfig.DeepCopyInto(&out.ApisixUpstreamConfig)
return
}
-// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new Port.
-func (in *Port) DeepCopy() *Port {
+// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new PortLevelSettings.
+func (in *PortLevelSettings) DeepCopy() *PortLevelSettings {
if in == nil {
return nil
}
- out := new(Port)
+ out := new(PortLevelSettings)
in.DeepCopyInto(out)
return out
}
diff --git a/pkg/kube/translator.go b/pkg/kube/translator.go
new file mode 100644
index 0000000..226bccd
--- /dev/null
+++ b/pkg/kube/translator.go
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package kube
+
+import (
+ "fmt"
+
+ corev1 "k8s.io/api/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ listerscorev1 "k8s.io/client-go/listers/core/v1"
+
+ configv1
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
+ listersv1
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
+ apisixv1
"github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+)
+
+const (
+ _defaultWeight = 100
+)
+
+type translateError struct {
+ field string
+ reason string
+}
+
+func (te *translateError) Error() string {
+ return fmt.Sprintf("%s: %s", te.field, te.reason)
+}
+
+// Translator translates Apisix* CRD resources to the description in APISIX.
+type Translator interface {
+ // TranslateUpstreamNodes translate Endpoints resources to APISIX
Upstream nodes
+ // according to the give port.
+ TranslateUpstreamNodes(*corev1.Endpoints, int32)
([]apisixv1.UpstreamNode, error)
+ // TranslateUpstreamConfig translates ApisixUpstreamConfig (part of
ApisixUpstream)
+ // to APISIX Upstream, it doesn't fill the the Upstream metadata and
nodes.
+ TranslateUpstreamConfig(config *configv1.ApisixUpstreamConfig)
(*apisixv1.Upstream, error)
+ // TranslateUpstream composes an upstream according to the
+ // given namespace, name (searching Service/Endpoints) and port
(filtering Endpoints).
+ // The returned Upstream doesn't have metadata info.
+ TranslateUpstream(string, string, int32) (*apisixv1.Upstream, error)
+}
+
+// TranslatorOptions contains options to help Translator
+// work well.
+type TranslatorOptions struct {
+ EndpointsLister listerscorev1.EndpointsLister
+ ServiceLister listerscorev1.ServiceLister
+ ApisixUpstreamLister listersv1.ApisixUpstreamLister
+}
+
+type translator struct {
+ *TranslatorOptions
+}
+
+// NewTranslator initializes a APISIX CRD resources Translator.
+func NewTranslator(opts *TranslatorOptions) Translator {
+ return &translator{
+ TranslatorOptions: opts,
+ }
+}
+
+func (t *translator) TranslateUpstreamConfig(au
*configv1.ApisixUpstreamConfig) (*apisixv1.Upstream, error) {
+ ups := apisixv1.NewDefaultUpstream()
+
+ if au.Scheme == "" {
+ au.Scheme = apisixv1.SchemeHTTP
+ } else {
+ switch au.Scheme {
+ case apisixv1.SchemeHTTP, apisixv1.SchemeGRPC:
+ ups.Scheme = au.Scheme
+ default:
+ return nil, &translateError{field: "scheme", reason:
"invalid value"}
+ }
+ }
+
+ if au.LoadBalancer == nil || au.LoadBalancer.Type == "" {
+ ups.Type = apisixv1.LbRoundRobin
+ } else {
+ switch au.LoadBalancer.Type {
+ case apisixv1.LbRoundRobin, apisixv1.LbLeastConn,
apisixv1.LbEwma:
+ ups.Type = au.LoadBalancer.Type
+ case apisixv1.LbConsistentHash:
+ ups.Type = au.LoadBalancer.Type
+ ups.Key = au.LoadBalancer.Key
+ switch au.LoadBalancer.HashOn {
+ case apisixv1.HashOnVars:
+ fallthrough
+ case apisixv1.HashOnHeader:
+ fallthrough
+ case apisixv1.HashOnCookie:
+ fallthrough
+ case apisixv1.HashOnConsumer:
+ fallthrough
+ case apisixv1.HashOnVarsCombination:
+ ups.HashOn = au.LoadBalancer.HashOn
+ default:
+ return nil, &translateError{field:
"loadbalancer.hashOn", reason: "invalid value"}
+ }
+ default:
+ return nil, &translateError{
+ field: "loadbalancer.type",
+ reason: "invalid value",
+ }
+ }
+ }
+ return ups, nil
+}
+
+func (t *translator) TranslateUpstream(namespace, name string, port int32)
(*apisixv1.Upstream, error) {
+ endpoints, err := t.EndpointsLister.Endpoints(namespace).Get(name)
+ if err != nil {
+ return nil, &translateError{
+ field: "endpoints",
+ reason: err.Error(),
+ }
+ }
+ nodes, err := t.TranslateUpstreamNodes(endpoints, port)
+ if err != nil {
+ return nil, err
+ }
+ ups := apisixv1.NewDefaultUpstream()
+ au, err := t.ApisixUpstreamLister.ApisixUpstreams(namespace).Get(name)
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ ups.Nodes = nodes
+ return ups, nil
+ }
+ return nil, &translateError{
+ field: "ApisixUpstream",
+ reason: err.Error(),
+ }
+ }
+ upsCfg := &au.Spec.ApisixUpstreamConfig
+ for _, pls := range au.Spec.PortLevelSettings {
+ if pls.Port == port {
+ upsCfg = &pls.ApisixUpstreamConfig
+ break
+ }
+ }
+ ups, err = t.TranslateUpstreamConfig(upsCfg)
+ if err != nil {
+ return nil, err
+ }
+ ups.Nodes = nodes
+ return ups, nil
+}
+
+func (t *translator) TranslateUpstreamNodes(endpoints *corev1.Endpoints, port
int32) ([]apisixv1.UpstreamNode, error) {
+ svc, err :=
t.ServiceLister.Services(endpoints.Namespace).Get(endpoints.Name)
+ if err != nil {
+ return nil, &translateError{
+ field: "service",
+ reason: err.Error(),
+ }
+ }
+
+ var svcPort *corev1.ServicePort
+ for _, exposePort := range svc.Spec.Ports {
+ if exposePort.Port == port {
+ svcPort = &exposePort
+ break
+ }
+ }
+ if svcPort == nil {
+ return nil, &translateError{
+ field: "service.spec.ports",
+ reason: "port not defined",
+ }
+ }
+ var nodes []apisixv1.UpstreamNode
+ for _, subset := range endpoints.Subsets {
+ var epPort *corev1.EndpointPort
+ for _, port := range subset.Ports {
+ if port.Name == svcPort.Name {
+ epPort = &port
+ break
+ }
+ }
+ if epPort != nil {
+ for _, addr := range subset.Addresses {
+ nodes = append(nodes, apisixv1.UpstreamNode{
+ IP: addr.IP,
+ Port: int(epPort.Port),
+ // FIXME Custom node weight
+ Weight: _defaultWeight,
+ })
+ }
+ }
+ }
+ return nodes, nil
+}
diff --git a/pkg/kube/translator_test.go b/pkg/kube/translator_test.go
new file mode 100644
index 0000000..743fa7d
--- /dev/null
+++ b/pkg/kube/translator_test.go
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package kube
+
+import (
+ "context"
+ "testing"
+
+ apisixv1
"github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+ "github.com/stretchr/testify/assert"
+ corev1 "k8s.io/api/core/v1"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/intstr"
+ "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes/fake"
+ "k8s.io/client-go/tools/cache"
+
+ configv1
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
+)
+
+func TestTranslateUpstreamConfig(t *testing.T) {
+ tr := &translator{}
+
+ au := &configv1.ApisixUpstreamConfig{
+ LoadBalancer: nil,
+ Scheme: apisixv1.SchemeGRPC,
+ }
+
+ ups, err := tr.TranslateUpstreamConfig(au)
+ assert.Nil(t, err, "checking upstream config translating")
+ assert.Equal(t, ups.Type, apisixv1.LbRoundRobin)
+ assert.Equal(t, ups.Scheme, apisixv1.SchemeGRPC)
+
+ au = &configv1.ApisixUpstreamConfig{
+ LoadBalancer: &configv1.LoadBalancer{
+ Type: apisixv1.LbConsistentHash,
+ HashOn: apisixv1.HashOnHeader,
+ Key: "user-agent",
+ },
+ Scheme: apisixv1.SchemeHTTP,
+ }
+ ups, err = tr.TranslateUpstreamConfig(au)
+ assert.Nil(t, err, "checking upstream config translating")
+ assert.Equal(t, ups.Type, apisixv1.LbConsistentHash)
+ assert.Equal(t, ups.Key, "user-agent")
+ assert.Equal(t, ups.HashOn, apisixv1.HashOnHeader)
+ assert.Equal(t, ups.Scheme, apisixv1.SchemeHTTP)
+
+ au = &configv1.ApisixUpstreamConfig{
+ LoadBalancer: &configv1.LoadBalancer{
+ Type: apisixv1.LbConsistentHash,
+ HashOn: apisixv1.HashOnHeader,
+ Key: "user-agent",
+ },
+ Scheme: "dns",
+ }
+ _, err = tr.TranslateUpstreamConfig(au)
+ assert.Error(t, err, &translateError{
+ field: "scheme",
+ reason: "invalid value",
+ })
+
+ au = &configv1.ApisixUpstreamConfig{
+ LoadBalancer: &configv1.LoadBalancer{
+ Type: "hash",
+ },
+ }
+ _, err = tr.TranslateUpstreamConfig(au)
+ assert.Error(t, err, &translateError{
+ field: "loadbalancer.type",
+ reason: "invalid value",
+ })
+
+ au = &configv1.ApisixUpstreamConfig{
+ LoadBalancer: &configv1.LoadBalancer{
+ Type: apisixv1.LbConsistentHash,
+ HashOn: "arg",
+ },
+ }
+ _, err = tr.TranslateUpstreamConfig(au)
+ assert.Error(t, err, &translateError{
+ field: "loadbalancer.hashOn",
+ reason: "invalid value",
+ })
+}
+
+func TestTranslateUpstreamNodes(t *testing.T) {
+ svc := &corev1.Service{
+ TypeMeta: metav1.TypeMeta{},
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "svc",
+ Namespace: "test",
+ },
+ Spec: corev1.ServiceSpec{
+ Ports: []corev1.ServicePort{
+ {
+ Name: "port1",
+ Port: 80,
+ TargetPort: intstr.IntOrString{
+ Type: intstr.Int,
+ IntVal: 9080,
+ },
+ },
+ {
+ Name: "port2",
+ Port: 443,
+ TargetPort: intstr.IntOrString{
+ Type: intstr.Int,
+ IntVal: 9443,
+ },
+ },
+ },
+ },
+ }
+ endpoints := &corev1.Endpoints{
+ TypeMeta: metav1.TypeMeta{},
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "svc",
+ Namespace: "test",
+ },
+ Subsets: []corev1.EndpointSubset{
+ {
+ Ports: []corev1.EndpointPort{
+ {
+ Name: "port1",
+ Port: 9080,
+ },
+ {
+ Name: "port2",
+ Port: 9443,
+ },
+ },
+ Addresses: []corev1.EndpointAddress{
+ {IP: "192.168.1.1"},
+ {IP: "192.168.1.2"},
+ },
+ },
+ },
+ }
+
+ client := fake.NewSimpleClientset()
+ informersFactory := informers.NewSharedInformerFactory(client, 0)
+ svcInformer := informersFactory.Core().V1().Services().Informer()
+ svcLister := informersFactory.Core().V1().Services().Lister()
+
+ processCh := make(chan struct{})
+ svcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ processCh <- struct{}{}
+ },
+ })
+
+ stopCh := make(chan struct{})
+ defer close(stopCh)
+ go svcInformer.Run(stopCh)
+ cache.WaitForCacheSync(stopCh, svcInformer.HasSynced)
+
+ _, err := client.CoreV1().Services("test").Create(context.Background(),
svc, metav1.CreateOptions{})
+ assert.Nil(t, err)
+
+ tr := &translator{&TranslatorOptions{
+ ServiceLister: svcLister,
+ }}
+ <-processCh
+
+ nodes, err := tr.TranslateUpstreamNodes(endpoints, 10080)
+ assert.Nil(t, nodes)
+ assert.Equal(t, err, &translateError{
+ field: "service.spec.ports",
+ reason: "port not defined",
+ })
+
+ nodes, err = tr.TranslateUpstreamNodes(endpoints, 80)
+ assert.Nil(t, err)
+ assert.Equal(t, nodes, []apisixv1.UpstreamNode{
+ {
+ IP: "192.168.1.1",
+ Port: 9080,
+ Weight: 100,
+ },
+ {
+ IP: "192.168.1.2",
+ Port: 9080,
+ Weight: 100,
+ },
+ })
+
+ nodes, err = tr.TranslateUpstreamNodes(endpoints, 443)
+ assert.Nil(t, err)
+ assert.Equal(t, nodes, []apisixv1.UpstreamNode{
+ {
+ IP: "192.168.1.1",
+ Port: 9443,
+ Weight: 100,
+ },
+ {
+ IP: "192.168.1.2",
+ Port: 9443,
+ Weight: 100,
+ },
+ })
+}
diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go
index a249ed2..549e39f 100644
--- a/pkg/types/apisix/v1/types.go
+++ b/pkg/types/apisix/v1/types.go
@@ -15,7 +15,9 @@
package v1
import (
+ "bytes"
"encoding/json"
+ "strconv"
)
const (
@@ -40,6 +42,11 @@ const (
LbEwma = "ewma"
// LbLeaseConn is the least connection load balancer.
LbLeastConn = "least_conn"
+
+ // SchemeHTTP represents the HTTP protocol.
+ SchemeHTTP = "http"
+ // SchemeGRPC represents the GRPC protocol.
+ SchemeGRPC = "grpc"
)
// Metadata contains all meta information about resources.
@@ -101,17 +108,17 @@ type Service struct {
type Upstream struct {
Metadata `json:",inline" yaml:",inline"`
- Type string `json:"type,omitempty" yaml:"type,omitempty"`
- HashOn string `json:"hash_on,omitemtpy" yaml:"hash_on,omitempty"`
- Key string `json:"key,omitempty" yaml:"key,omitempty"`
- Nodes []Node `json:"nodes,omitempty" yaml:"nodes,omitempty"`
- FromKind string `json:"from_kind,omitempty" yaml:"from_kind,omitempty"`
- Scheme string `json:"scheme,omitempty" yaml:"scheme,omitempty"`
+ Type string `json:"type,omitempty" yaml:"type,omitempty"`
+ HashOn string `json:"hash_on,omitemtpy"
yaml:"hash_on,omitempty"`
+ Key string `json:"key,omitempty" yaml:"key,omitempty"`
+ Nodes []UpstreamNode `json:"nodes,omitempty" yaml:"nodes,omitempty"`
+ FromKind string `json:"from_kind,omitempty"
yaml:"from_kind,omitempty"`
+ Scheme string `json:"scheme,omitempty"
yaml:"scheme,omitempty"`
}
// Node the node in upstream
// +k8s:deepcopy-gen=true
-type Node struct {
+type UpstreamNode struct {
IP string `json:"ip,omitempty" yaml:"ip,omitempty"`
Port int `json:"port,omitempty" yaml:"port,omitempty"`
Weight int `json:"weight,omitempty" yaml:"weight,omitempty"`
@@ -128,3 +135,32 @@ type Ssl struct {
Status int `json:"status,omitempty" yaml:"status,omitempty"`
Group string `json:"group,omitempty" yaml:"group,omitempty"`
}
+
+// NewDefaultUpstream returns an empty Upstream with default values.
+func NewDefaultUpstream() *Upstream {
+ return &Upstream{
+ Type: LbRoundRobin,
+ Key: "",
+ Nodes: nil,
+ FromKind: "",
+ Scheme: SchemeHTTP,
+ }
+}
+
+// ComposeUpstreamName uses namespace, name and port info to compose
+// the upstream name.
+func ComposeUpstreamName(namespace, name string, port int32) string {
+ pstr := strconv.Itoa(int(port))
+ // FIXME Use sync.Pool to reuse this buffer if the upstream
+ // name composing code path is hot.
+ p := make([]byte, 0, len(namespace)+len(name)+len(pstr)+2)
+ buf := bytes.NewBuffer(p)
+
+ buf.WriteString(namespace)
+ buf.WriteByte('_')
+ buf.WriteString(name)
+ buf.WriteByte('_')
+ buf.WriteString(pstr)
+
+ return buf.String()
+}
diff --git a/pkg/types/apisix/v1/zz_generated.deepcopy.go
b/pkg/types/apisix/v1/zz_generated.deepcopy.go
index d8d239b..78ee480 100644
--- a/pkg/types/apisix/v1/zz_generated.deepcopy.go
+++ b/pkg/types/apisix/v1/zz_generated.deepcopy.go
@@ -21,22 +21,6 @@ limitations under the License.
package v1
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
-func (in *Node) DeepCopyInto(out *Node) {
- *out = *in
- return
-}
-
-// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new Node.
-func (in *Node) DeepCopy() *Node {
- if in == nil {
- return nil
- }
- out := new(Node)
- in.DeepCopyInto(out)
- return out
-}
-
-// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
func (in *Route) DeepCopyInto(out *Route) {
*out = *in
out.Metadata = in.Metadata
@@ -103,7 +87,7 @@ func (in *Upstream) DeepCopyInto(out *Upstream) {
out.Metadata = in.Metadata
if in.Nodes != nil {
in, out := &in.Nodes, &out.Nodes
- *out = make([]Node, len(*in))
+ *out = make([]UpstreamNode, len(*in))
copy(*out, *in)
}
return
@@ -118,3 +102,19 @@ func (in *Upstream) DeepCopy() *Upstream {
in.DeepCopyInto(out)
return out
}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver,
writing into out. in must be non-nil.
+func (in *UpstreamNode) DeepCopyInto(out *UpstreamNode) {
+ *out = *in
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver,
creating a new UpstreamNode.
+func (in *UpstreamNode) DeepCopy() *UpstreamNode {
+ if in == nil {
+ return nil
+ }
+ out := new(UpstreamNode)
+ in.DeepCopyInto(out)
+ return out
+}
diff --git a/pkg/types/event.go b/pkg/types/event.go
index 7535ba5..1d68553 100644
--- a/pkg/types/event.go
+++ b/pkg/types/event.go
@@ -42,6 +42,11 @@ func (ev EventType) String() string {
// Event represents a typed event.
type Event struct {
- Type EventType
+ // Type is the type of event.
+ Type EventType
+ // Object is the event subject.
Object interface{}
+ // Tombstone is the final state before object was delete,
+ // it's useful for DELETE event.
+ Tombstone interface{}
}
diff --git a/test/e2e/endpoints/endpoints.go b/test/e2e/endpoints/endpoints.go
index 16e30cf..86a1ff2 100644
--- a/test/e2e/endpoints/endpoints.go
+++ b/test/e2e/endpoints/endpoints.go
@@ -26,26 +26,31 @@ import (
var _ = ginkgo.Describe("endpoints", func() {
s := scaffold.NewDefaultScaffold()
- ginkgo.It("ignore applied only if there is an ApisixUpstream
referenced", func() {
+ ginkgo.It("ignore applied only if there is an ApisixRoute referenced",
func() {
time.Sleep(5 * time.Second)
assert.Nil(ginkgo.GinkgoT(),
s.EnsureNumApisixUpstreamsCreated(0), "checking number of upstreams")
backendSvc, backendSvcPort := s.DefaultHTTPBackend()
ups := fmt.Sprintf(`
apiVersion: apisix.apache.org/v1
-kind: ApisixUpstream
+kind: ApisixRoute
metadata:
- name: %s
+ name: httpbin-route
spec:
- ports:
- - port: %d
- loadbalancer:
- type: roundrobin
+ rules:
+ - host: httpbin.org
+ http:
+ paths:
+ - backend:
+ serviceName: %s
+ servicePort: %d
+ path: /ip
`, backendSvc, backendSvcPort[0])
assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(ups))
assert.Nil(ginkgo.GinkgoT(),
s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams")
})
ginkgo.It("upstream nodes should be reset to empty when
Service/Endpoints was deleted", func() {
+ ginkgo.Skip("now we don't handle endpoints delete event")
backendSvc, backendSvcPort := s.DefaultHTTPBackend()
apisixRoute := fmt.Sprintf(`
apiVersion: apisix.apache.org/v1
@@ -72,3 +77,54 @@ spec:
s.NewAPISIXClient().GET("/ip").WithHeader("Host",
"httpbin.com").Expect().Status(http.StatusServiceUnavailable)
})
})
+
+var _ = ginkgo.Describe("port usage", func() {
+ s := scaffold.NewScaffold(&scaffold.Options{
+ Name: "endpoints-port",
+ Kubeconfig: scaffold.GetKubeconfig(),
+ APISIXConfigPath: "testdata/apisix-gw-config.yaml",
+ APISIXDefaultConfigPath:
"testdata/apisix-gw-config-default.yaml",
+ IngressAPISIXReplicas: 1,
+ HTTPBinServicePort: 8080, // use service port which is
different from target port (80)
+ })
+ ginkgo.It("service port != target port", func() {
+ backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+ apisixRoute := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixRoute
+metadata:
+ name: httpbin-route
+spec:
+ rules:
+ - host: httpbin.com
+ http:
+ paths:
+ - backend:
+ serviceName: %s
+ servicePort: %d
+ path: /ip
+`, backendSvc, backendSvcPort[0])
+ assert.Nil(ginkgo.GinkgoT(),
s.CreateResourceFromString(apisixRoute))
+ time.Sleep(3 * time.Second)
+ ups, err := s.ListApisixUpstreams()
+ assert.Nil(ginkgo.GinkgoT(), err, "listing APISIX upstreams")
+ assert.Len(ginkgo.GinkgoT(), ups, 1)
+ assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 1)
+
+ // port in nodes is still the targetPort, not the service port
+ assert.Equal(ginkgo.GinkgoT(), ups[0].Nodes[0].Port, 80)
+
+ // scale HTTPBIN, so the endpoints controller has the
opportunity to update upstream.
+ assert.Nil(ginkgo.GinkgoT(), s.ScaleHTTPBIN(3))
+ time.Sleep(10 * time.Second)
+ ups, err = s.ListApisixUpstreams()
+ assert.Nil(ginkgo.GinkgoT(), err, "listing APISIX upstreams")
+ assert.Len(ginkgo.GinkgoT(), ups, 1)
+ assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 3)
+
+ // port in nodes is still the targetPort, not the service port
+ assert.Equal(ginkgo.GinkgoT(), ups[0].Nodes[0].Port, 80)
+ assert.Equal(ginkgo.GinkgoT(), ups[0].Nodes[1].Port, 80)
+ assert.Equal(ginkgo.GinkgoT(), ups[0].Nodes[2].Port, 80)
+ })
+})
diff --git a/test/e2e/features/scheme.go b/test/e2e/features/scheme.go
index 6e50e2c..bdcd529 100644
--- a/test/e2e/features/scheme.go
+++ b/test/e2e/features/scheme.go
@@ -59,15 +59,10 @@ kind: ApisixUpstream
metadata:
name: grpc-server-service
spec:
- ports:
+ portLevelSettings:
- port: 50051
scheme: grpc
`))
- time.Sleep(2 * time.Second)
- ups, err := s.ListApisixUpstreams()
- assert.Nil(ginkgo.GinkgoT(), err)
- assert.Len(ginkgo.GinkgoT(), ups, 1)
-
err = s.CreateResourceFromString(`
apiVersion: apisix.apache.org/v1
kind: ApisixRoute
@@ -84,6 +79,11 @@ spec:
path: /helloworld.Greeter/SayHello
`)
assert.Nil(ginkgo.GinkgoT(), err)
+ time.Sleep(2 * time.Second)
+ ups, err := s.ListApisixUpstreams()
+ assert.Nil(ginkgo.GinkgoT(), err)
+ assert.Len(ginkgo.GinkgoT(), ups, 1)
+ assert.Equal(ginkgo.GinkgoT(), ups[0].Scheme, "grpc")
// TODO enable the following test cases once APISIX supports
HTTP/2 in plain.
//ep, err := s.GetAPISIXEndpoint()
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index ecdb83f..5ab61c8 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -45,6 +45,7 @@ type Options struct {
APISIXConfigPath string
APISIXDefaultConfigPath string
IngressAPISIXReplicas int
+ HTTPBinServicePort int
}
type Scaffold struct {
@@ -106,6 +107,7 @@ func NewDefaultScaffold() *Scaffold {
APISIXConfigPath: "testdata/apisix-gw-config.yaml",
APISIXDefaultConfigPath:
"testdata/apisix-gw-config-default.yaml",
IngressAPISIXReplicas: 1,
+ HTTPBinServicePort: 80,
}
return NewScaffold(opts)
}