This is an automated email from the ASF dual-hosted git repository.

tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ca5075  chore: new apisix upstream controller (#236)
8ca5075 is described below

commit 8ca50759480940120328bf99bc476e19d9599c39
Author: Alex Zhang <[email protected]>
AuthorDate: Sun Feb 7 13:57:02 2021 +0800

    chore: new apisix upstream controller (#236)
---
 pkg/apisix/resource.go                             |   4 +-
 pkg/apisix/upstream_test.go                        |   2 +-
 pkg/ingress/apisix/route.go                        |  25 +-
 pkg/ingress/apisix/upstream.go                     | 114 --------
 pkg/ingress/apisix/upstream_test.go                | 111 -------
 pkg/ingress/controller/apisix_route.go             |   8 +-
 pkg/ingress/controller/apisix_upstream.go          | 320 ++++++++++++---------
 pkg/ingress/controller/controller.go               |  54 ++--
 pkg/ingress/controller/endpoint.go                 |  72 ++---
 pkg/ingress/endpoint/ep.go                         |  61 ----
 pkg/kube/apisix/apis/config/v1/types.go            |  41 ++-
 .../apisix/apis/config/v1/zz_generated.deepcopy.go |  42 ++-
 pkg/kube/translator.go                             | 204 +++++++++++++
 pkg/kube/translator_test.go                        | 214 ++++++++++++++
 pkg/types/apisix/v1/types.go                       |  50 +++-
 pkg/types/apisix/v1/zz_generated.deepcopy.go       |  34 +--
 pkg/types/event.go                                 |   7 +-
 test/e2e/endpoints/endpoints.go                    |  70 ++++-
 test/e2e/features/scheme.go                        |  12 +-
 test/e2e/scaffold/scaffold.go                      |   2 +
 20 files changed, 882 insertions(+), 565 deletions(-)

diff --git a/pkg/apisix/resource.go b/pkg/apisix/resource.go
index 19aa87d..cb0f3a3 100644
--- a/pkg/apisix/resource.go
+++ b/pkg/apisix/resource.go
@@ -126,9 +126,9 @@ func (i *item) upstream(clusterName string) (*v1.Upstream, 
error) {
                return nil, err
        }
 
-       var nodes []v1.Node
+       var nodes []v1.UpstreamNode
        for _, node := range ups.Nodes {
-               nodes = append(nodes, v1.Node{
+               nodes = append(nodes, v1.UpstreamNode{
                        IP:     node.Host,
                        Port:   node.Port,
                        Weight: node.Weight,
diff --git a/pkg/apisix/upstream_test.go b/pkg/apisix/upstream_test.go
index d287b59..9fa7045 100644
--- a/pkg/apisix/upstream_test.go
+++ b/pkg/apisix/upstream_test.go
@@ -165,7 +165,7 @@ func TestUpstreamClient(t *testing.T) {
        ip := "10.0.11.153"
        port := 15006
        weight := 100
-       nodes := []v1.Node{
+       nodes := []v1.UpstreamNode{
                {
                        IP:     ip,
                        Port:   port,
diff --git a/pkg/ingress/apisix/route.go b/pkg/ingress/apisix/route.go
index 64d57d5..2487657 100644
--- a/pkg/ingress/apisix/route.go
+++ b/pkg/ingress/apisix/route.go
@@ -17,7 +17,7 @@ package apisix
 import (
        "strconv"
 
-       "github.com/apache/apisix-ingress-controller/pkg/ingress/endpoint"
+       "github.com/apache/apisix-ingress-controller/pkg/kube"
        configv1 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
        "github.com/apache/apisix-ingress-controller/pkg/seven/conf"
        apisix "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
@@ -37,7 +37,7 @@ const (
 type ApisixRoute configv1.ApisixRoute
 
 // Convert convert to  apisix.Route from ingress.ApisixRoute CRD
-func (ar *ApisixRoute) Convert() ([]*apisix.Route, []*apisix.Service, 
[]*apisix.Upstream, error) {
+func (ar *ApisixRoute) Convert(translator kube.Translator) ([]*apisix.Route, 
[]*apisix.Service, []*apisix.Upstream, error) {
        ns := ar.Namespace
        // meta annotation
        plugins, group := BuildAnnotation(ar.Annotations)
@@ -125,20 +125,15 @@ func (ar *ApisixRoute) Convert() ([]*apisix.Route, 
[]*apisix.Service, []*apisix.
                        if group != "" {
                                fullUpstreamName = group + "_" + 
apisixUpstreamName
                        }
-                       LBType := DefaultLBType
-                       port, _ := strconv.Atoi(svcPort)
-                       nodes := endpoint.BuildEps(ns, svcName, port)
-                       upstream := &apisix.Upstream{
-                               Metadata: apisix.Metadata{
-                                       FullName:        fullUpstreamName,
-                                       Group:           group,
-                                       ResourceVersion: rv,
-                                       Name:            apisixUpstreamName,
-                               },
-                               Type:  LBType,
-                               Nodes: nodes,
+                       ups, err := translator.TranslateUpstream(ns, svcName, 
int32(p.Backend.ServicePort))
+                       if err != nil {
+                               return nil, nil, nil, err
                        }
-                       upstreamMap[upstream.FullName] = upstream
+                       ups.FullName = fullUpstreamName
+                       ups.Group = group
+                       ups.ResourceVersion = rv
+                       ups.Name = apisixUpstreamName
+                       upstreamMap[ups.FullName] = ups
                }
        }
        for _, s := range serviceMap {
diff --git a/pkg/ingress/apisix/upstream.go b/pkg/ingress/apisix/upstream.go
deleted file mode 100644
index a5970e6..0000000
--- a/pkg/ingress/apisix/upstream.go
+++ /dev/null
@@ -1,114 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package apisix
-
-import (
-       "errors"
-       "fmt"
-       "strconv"
-
-       "github.com/apache/apisix-ingress-controller/pkg/ingress/endpoint"
-       configv1 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
-       "github.com/apache/apisix-ingress-controller/pkg/seven/conf"
-       apisix "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
-)
-
-const (
-       ApisixUpstream = "ApisixUpstream"
-)
-
-//type ApisixUpstreamCRD ingress.ApisixUpstream
-
-type ApisixUpstreamBuilder struct {
-       CRD *configv1.ApisixUpstream
-       Ep  endpoint.Endpoint
-}
-
-// Convert convert to  apisix.Route from ingress.ApisixRoute CRD
-func (aub *ApisixUpstreamBuilder) Convert() ([]*apisix.Upstream, error) {
-       ar := aub.CRD
-       ns := ar.Namespace
-       name := ar.Name
-       // meta annotation
-       _, group := BuildAnnotation(ar.Annotations)
-       conf.AddGroup(group)
-
-       upstreams := make([]*apisix.Upstream, 0)
-       rv := ar.ObjectMeta.ResourceVersion
-       Ports := ar.Spec.Ports
-       for _, r := range Ports {
-               if r.Scheme != "" && r.Scheme != configv1.SchemeHTTP && 
r.Scheme != configv1.SchemeGRPC {
-                       return nil, fmt.Errorf("bad scheme %s", r.Scheme)
-               }
-
-               port := r.Port
-               // apisix route name = namespace_svcName_svcPort = apisix 
service name
-               apisixUpstreamName := ns + "_" + name + "_" + 
strconv.Itoa(int(port))
-
-               lb := r.LoadBalancer
-
-               //nodes := endpoint.BuildEps(ns, name, int(port))
-               nodes := aub.Ep.BuildEps(ns, name, port)
-               fromKind := ApisixUpstream
-
-               // fullName
-               fullName := apisixUpstreamName
-               if group != "" {
-                       fullName = group + "_" + apisixUpstreamName
-               }
-               upstream := &apisix.Upstream{
-                       Metadata: apisix.Metadata{
-                               FullName:        fullName,
-                               Group:           group,
-                               ResourceVersion: rv,
-                               Name:            apisixUpstreamName,
-                       },
-                       Nodes:    nodes,
-                       FromKind: fromKind,
-               }
-               if r.Scheme != "" {
-                       upstream.Scheme = r.Scheme
-               }
-               if lb == nil || lb.Type == "" {
-                       upstream.Type = apisix.LbRoundRobin
-               } else {
-                       switch lb.Type {
-                       case apisix.LbRoundRobin, apisix.LbLeastConn, 
apisix.LbEwma:
-                               upstream.Type = lb.Type
-                       case apisix.LbConsistentHash:
-                               upstream.Type = lb.Type
-                               upstream.Key = lb.Key
-                               switch lb.HashOn {
-                               case apisix.HashOnVars:
-                                       fallthrough
-                               case apisix.HashOnHeader:
-                                       fallthrough
-                               case apisix.HashOnCookie:
-                                       fallthrough
-                               case apisix.HashOnConsumer:
-                                       fallthrough
-                               case apisix.HashOnVarsCombination:
-                                       upstream.HashOn = lb.HashOn
-                               default:
-                                       return nil, errors.New("invalid hashOn 
value")
-                               }
-                       default:
-                               return nil, errors.New("invalid load balancer 
type")
-                       }
-               }
-               upstreams = append(upstreams, upstream)
-       }
-       return upstreams, nil
-}
diff --git a/pkg/ingress/apisix/upstream_test.go 
b/pkg/ingress/apisix/upstream_test.go
deleted file mode 100644
index 548d18c..0000000
--- a/pkg/ingress/apisix/upstream_test.go
+++ /dev/null
@@ -1,111 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package apisix
-
-import (
-       "fmt"
-       "testing"
-
-       "github.com/stretchr/testify/assert"
-       "gopkg.in/yaml.v2"
-
-       configv1 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
-       v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
-)
-
-func TestApisixUpstreamCRD_Convert(t *testing.T) {
-       assert := assert.New(t)
-
-       // get yaml from string
-       var crd configv1.ApisixUpstream
-       bytes := []byte(upstreamYaml)
-       if err := yaml.Unmarshal(bytes, &crd); err != nil {
-               assert.Error(err)
-       } else {
-               au3 := &ApisixUpstreamBuilder{CRD: &crd, Ep: 
&EndpointRequestTest{}} // mock endpoints
-               // convert
-               if upstreams, err := au3.Convert(); err != nil {
-                       assert.Error(err)
-               } else {
-                       // equals or deepCompare
-                       upstreamExpect := buildExpectUpstream()
-                       //upstreamsExpect := []*v1.Upstream{upstreamExpect}
-                       b := equals(upstreams[0], upstreamExpect)
-                       //b := reflect.DeepEqual(upstreams, 
[]*v1.Upstream{upstreamExpect})
-                       if !b {
-                               assert.True(b, "convert upstream not expected")
-                               assert.Error(fmt.Errorf("convert upstream not 
expect"))
-                       }
-                       t.Log("[upstream convert] ok")
-               }
-       }
-}
-
-func equals(s, d *v1.Upstream) bool {
-       if s.Name != d.Name || s.FullName != d.FullName || s.Group != d.Group {
-               return false
-       }
-
-       if s.FromKind != d.FromKind || s.Type != d.Type || s.Key != d.Key || 
s.HashOn != d.HashOn {
-               return false
-       }
-
-       return true
-}
-
-// mock BuildEps
-type EndpointRequestTest struct{}
-
-func (epr *EndpointRequestTest) BuildEps(ns, name string, port int) []v1.Node {
-       nodes := make([]v1.Node, 0)
-       return nodes
-}
-
-func buildExpectUpstream() *v1.Upstream {
-       fullName := "cloud_httpserver_8080"
-       LBType := "chash"
-       HashOn := "header"
-       Key := "hello_key"
-       fromKind := "ApisixUpstream"
-       group := ""
-       upstreamExpect := &v1.Upstream{
-               Metadata: v1.Metadata{
-                       Group:           group,
-                       ResourceVersion: group,
-                       FullName:        fullName,
-                       Name:            fullName,
-               },
-               Type:     LBType,
-               HashOn:   HashOn,
-               Key:      Key,
-               FromKind: fromKind,
-       }
-       return upstreamExpect
-}
-
-var upstreamYaml = `
-kind: ApisixUpstream
-apiVersion: apisix.apache.org/v1
-metadata:
-  name: httpserver
-  namespace: cloud
-spec:
-  ports:
-  - loadbalancer:
-      hashOn: header
-      key: hello_key
-      type: chash
-    port: 8080
-`
diff --git a/pkg/ingress/controller/apisix_route.go 
b/pkg/ingress/controller/apisix_route.go
index fca49f7..f629d35 100644
--- a/pkg/ingress/controller/apisix_route.go
+++ b/pkg/ingress/controller/apisix_route.go
@@ -218,7 +218,7 @@ func (c *ApisixRouteController) add(key string) error {
                return err
        }
        apisixRoute := apisix.ApisixRoute(*apisixIngressRoute)
-       routes, services, upstreams, _ := apisixRoute.Convert()
+       routes, services, upstreams, _ := 
apisixRoute.Convert(c.controller.translator)
        comb := state.ApisixCombination{Routes: routes, Services: services, 
Upstreams: upstreams}
        _, err = comb.Solver()
        return err
@@ -246,10 +246,10 @@ func (c *ApisixRouteController) sync(rqo *RouteQueueObj) 
error {
                        return err // if error occurred, return
                }
                oldApisixRoute := apisix.ApisixRoute(*rqo.OldObj)
-               oldRoutes, _, _, _ := oldApisixRoute.Convert()
+               oldRoutes, _, _, _ := 
oldApisixRoute.Convert(c.controller.translator)
 
                newApisixRoute := apisix.ApisixRoute(*apisixIngressRoute)
-               newRoutes, _, _, _ := newApisixRoute.Convert()
+               newRoutes, _, _, _ := 
newApisixRoute.Convert(c.controller.translator)
 
                rc := &state.RouteCompare{OldRoutes: oldRoutes, NewRoutes: 
newRoutes}
                return rc.Sync()
@@ -260,7 +260,7 @@ func (c *ApisixRouteController) sync(rqo *RouteQueueObj) 
error {
                        return nil
                }
                apisixRoute := apisix.ApisixRoute(*rqo.OldObj)
-               routes, services, upstreams, _ := apisixRoute.Convert()
+               routes, services, upstreams, _ := 
apisixRoute.Convert(c.controller.translator)
                rc := &state.RouteCompare{OldRoutes: routes, NewRoutes: nil}
                if err := rc.Sync(); err != nil {
                        return err
diff --git a/pkg/ingress/controller/apisix_upstream.go 
b/pkg/ingress/controller/apisix_upstream.go
index 498919e..eed800a 100644
--- a/pkg/ingress/controller/apisix_upstream.go
+++ b/pkg/ingress/controller/apisix_upstream.go
@@ -15,202 +15,244 @@
 package controller
 
 import (
-       "fmt"
-       "time"
-
-       "k8s.io/apimachinery/pkg/api/errors"
-       "k8s.io/apimachinery/pkg/util/runtime"
-       "k8s.io/apimachinery/pkg/util/wait"
-       "k8s.io/client-go/kubernetes"
-       "k8s.io/client-go/kubernetes/scheme"
+       "context"
+
+       "go.uber.org/zap"
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/client-go/tools/cache"
        "k8s.io/client-go/util/workqueue"
 
-       "github.com/apache/apisix-ingress-controller/pkg/ingress/apisix"
-       "github.com/apache/apisix-ingress-controller/pkg/ingress/endpoint"
+       apisixcache 
"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
        configv1 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
-       clientset 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned"
-       apisixscheme 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/clientset/versioned/scheme"
-       informersv1 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/informers/externalversions/config/v1"
-       listersv1 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
        "github.com/apache/apisix-ingress-controller/pkg/log"
-       "github.com/apache/apisix-ingress-controller/pkg/seven/state"
+       "github.com/apache/apisix-ingress-controller/pkg/types"
+       apisixv1 
"github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
 )
 
-type ApisixUpstreamController struct {
-       controller           *Controller
-       kubeclientset        kubernetes.Interface
-       apisixClientset      clientset.Interface
-       apisixUpstreamList   listersv1.ApisixUpstreamLister
-       apisixUpstreamSynced cache.InformerSynced
-       workqueue            workqueue.RateLimitingInterface
+type apisixUpstreamController struct {
+       controller *Controller
+       workqueue  workqueue.RateLimitingInterface
+       workers    int
 }
 
-func BuildApisixUpstreamController(
-       kubeclientset kubernetes.Interface,
-       apisixUpstreamClientset clientset.Interface,
-       apisixUpstreamInformer informersv1.ApisixUpstreamInformer,
-       root *Controller) *ApisixUpstreamController {
-
-       runtime.Must(apisixscheme.AddToScheme(scheme.Scheme))
-       controller := &ApisixUpstreamController{
-               controller:           root,
-               kubeclientset:        kubeclientset,
-               apisixClientset:      apisixUpstreamClientset,
-               apisixUpstreamList:   apisixUpstreamInformer.Lister(),
-               apisixUpstreamSynced: 
apisixUpstreamInformer.Informer().HasSynced,
-               workqueue:            
workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second,
 60*time.Second, 5), "ApisixUpstreams"),
-       }
-       apisixUpstreamInformer.Informer().AddEventHandler(
+func (c *Controller) newApisixUpstreamController() *apisixUpstreamController {
+       ctl := &apisixUpstreamController{
+               controller: c,
+               workqueue:  
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), 
"ApisixUpstream"),
+               workers:    1,
+       }
+
+       ctl.controller.apisixUpstreamInformer.AddEventHandler(
                cache.ResourceEventHandlerFuncs{
-                       AddFunc:    controller.addFunc,
-                       UpdateFunc: controller.updateFunc,
-                       DeleteFunc: controller.deleteFunc,
-               })
-       return controller
+                       AddFunc:    ctl.onAdd,
+                       UpdateFunc: ctl.onUpdate,
+                       DeleteFunc: ctl.OnDelete,
+               },
+       )
+       return ctl
 }
 
-func (c *ApisixUpstreamController) Run(stop <-chan struct{}) error {
-       // 同步缓存
-       if ok := cache.WaitForCacheSync(stop); !ok {
-               log.Error("同步ApisixUpstream缓存失败")
-               return fmt.Errorf("failed to wait for caches to sync")
+func (c *apisixUpstreamController) run(ctx context.Context) {
+       log.Info("ApisixUpstream controller started")
+       defer log.Info("ApisixUpstream controller exited")
+       if ok := cache.WaitForCacheSync(ctx.Done(), 
c.controller.apisixUpstreamInformer.HasSynced, 
c.controller.svcInformer.HasSynced); !ok {
+               log.Errorf("cache sync failed")
+               return
        }
-       go wait.Until(c.runWorker, time.Second, stop)
-       return nil
+       for i := 0; i < c.workers; i++ {
+               go c.runWorker(ctx)
+       }
+
+       <-ctx.Done()
+       c.workqueue.ShutDown()
 }
 
-func (c *ApisixUpstreamController) runWorker() {
-       for c.processNextWorkItem() {
+func (c *apisixUpstreamController) runWorker(ctx context.Context) {
+       for {
+               obj, quit := c.workqueue.Get()
+               if quit {
+                       return
+               }
+               err := c.sync(ctx, obj.(*types.Event))
+               c.workqueue.Done(obj)
+               c.handleSyncErr(obj, err)
        }
 }
 
-func (c *ApisixUpstreamController) processNextWorkItem() bool {
-       defer recoverException()
-       obj, shutdown := c.workqueue.Get()
-       if shutdown {
-               return false
+func (c *apisixUpstreamController) sync(ctx context.Context, ev *types.Event) 
error {
+       key := ev.Object.(string)
+       namespace, name, err := cache.SplitMetaNamespaceKey(key)
+       if err != nil {
+               log.Errorf("found ApisixUpstream resource with invalid meta 
namespace key %s: %s", key, err)
+               return err
        }
-       err := func(obj interface{}) error {
-               defer c.workqueue.Done(obj)
-               var sqo *UpstreamQueueObj
-               var ok bool
 
-               if sqo, ok = obj.(*UpstreamQueueObj); !ok {
-                       c.workqueue.Forget(obj)
-                       return fmt.Errorf("expected string in workqueue but got 
%#v", obj)
+       au, err := 
c.controller.apisixUpstreamLister.ApisixUpstreams(namespace).Get(name)
+       if err != nil {
+               if !k8serrors.IsNotFound(err) {
+                       log.Errorf("failed to get ApisixUpstream %s: %s", key, 
err)
+                       return err
                }
-               // 在syncHandler中处理业务
-               if err := c.syncHandler(sqo); err != nil {
-                       c.workqueue.AddRateLimited(obj)
-                       return fmt.Errorf("error syncing '%s': %s", sqo.Key, 
err.Error())
+               if ev.Type != types.EventDelete {
+                       log.Warnf("ApisixUpstream %s was deleted before it can 
be delivered", key)
+                       // Don't need retry.
+                       return nil
                }
+       }
+       if ev.Type == types.EventDelete {
+               if au != nil {
+                       // We still find the resource while we are processing 
the DELETE event,
+                       // that means object with same namespace and name was 
created, discarding
+                       // this stale DELETE event.
+                       log.Warnf("discard the stale ApisixUpstream DELETE 
event since the %s exists", key)
+                       return nil
+               }
+               au = ev.Tombstone.(*configv1.ApisixUpstream)
+       }
 
-               c.workqueue.Forget(obj)
-               return nil
-       }(obj)
-       if err != nil {
-               runtime.HandleError(err)
+       var portLevelSettings map[int32]*configv1.ApisixUpstreamConfig
+       if len(au.Spec.PortLevelSettings) > 0 {
+               portLevelSettings = 
make(map[int32]*configv1.ApisixUpstreamConfig, len(au.Spec.PortLevelSettings))
+               for _, port := range au.Spec.PortLevelSettings {
+                       portLevelSettings[port.Port] = 
&port.ApisixUpstreamConfig
+               }
        }
-       return true
-}
 
-func (c *ApisixUpstreamController) syncHandler(sqo *UpstreamQueueObj) error {
-       namespace, name, err := cache.SplitMetaNamespaceKey(sqo.Key)
+       svc, err := c.controller.svcLister.Services(namespace).Get(name)
        if err != nil {
-               log.Errorf("invalid resource key: %s", sqo.Key)
-               return fmt.Errorf("invalid resource key: %s", sqo.Key)
-       }
-       apisixUpstreamYaml := sqo.OldObj
-       if sqo.Ope == DELETE {
-               apisixIngressUpstream, _ := 
c.apisixUpstreamList.ApisixUpstreams(namespace).Get(name)
-               if apisixIngressUpstream != nil && 
apisixIngressUpstream.ResourceVersion > sqo.OldObj.ResourceVersion {
-                       log.Warnf("Upstream %s has been covered when retry", 
sqo.Key)
-                       return nil
-               }
-       } else {
-               apisixUpstreamYaml, err = 
c.apisixUpstreamList.ApisixUpstreams(namespace).Get(name)
+               log.Errorf("failed to get service %s: %s", key, err)
+               return err
+       }
+
+       for _, port := range svc.Spec.Ports {
+               upsName := apisixv1.ComposeUpstreamName(namespace, name, 
port.Port)
+               // TODO: multiple cluster
+               ups, err := c.controller.apisix.Cluster("").Upstream().Get(ctx, 
upsName)
                if err != nil {
-                       if errors.IsNotFound(err) {
-                               log.Infof("apisixUpstream %s is removed", 
sqo.Key)
-                               return nil
+                       if err == apisixcache.ErrNotFound {
+                               continue
                        }
-                       runtime.HandleError(fmt.Errorf("failed to list 
apisixUpstream %s/%s", sqo.Key, err.Error()))
+                       log.Errorf("failed to get upstream %s: %s", upsName, 
err)
                        return err
                }
-       }
-       aub := apisix.ApisixUpstreamBuilder{CRD: apisixUpstreamYaml, Ep: 
&endpoint.EndpointRequest{}}
-       upstreams, _ := aub.Convert()
-       comb := state.ApisixCombination{Routes: nil, Services: nil, Upstreams: 
upstreams}
-       if sqo.Ope == DELETE {
-               return comb.Remove()
-       } else {
-               _, err = comb.Solver()
-               return err
-       }
+               var newUps *apisixv1.Upstream
+               if ev.Type != types.EventDelete {
+                       cfg, ok := portLevelSettings[port.Port]
+                       if !ok {
+                               cfg = &au.Spec.ApisixUpstreamConfig
+                       }
+                       // FIXME Same ApisixUpstreamConfig might be translated 
multiple times.
+                       newUps, err = 
c.controller.translator.TranslateUpstreamConfig(cfg)
+                       if err != nil {
+                               log.Errorw("found malformed ApisixUpstream",
+                                       zap.Any("object", au),
+                                       zap.Error(err),
+                               )
+                               return err
+                       }
+               } else {
+                       newUps = apisixv1.NewDefaultUpstream()
+               }
 
+               newUps.Metadata = ups.Metadata
+               newUps.Nodes = ups.Nodes
+               log.Debugw("updating upstream since ApisixUpstream changed",
+                       zap.String("event", ev.Type.String()),
+                       zap.Any("upstream", newUps),
+                       zap.Any("ApisixUpstream", au),
+               )
+               if _, err := 
c.controller.apisix.Cluster("").Upstream().Update(ctx, newUps); err != nil {
+                       log.Errorw("failed to update upstream",
+                               zap.Error(err),
+                               zap.Any("upstream", newUps),
+                               zap.Any("ApisixUpstream", au),
+                       )
+                       return err
+               }
+       }
+       return nil
 }
 
-type UpstreamQueueObj struct {
-       Key    string                   `json:"key"`
-       OldObj *configv1.ApisixUpstream `json:"old_obj"`
-       Ope    string                   `json:"ope"` // add / update / delete
+func (c *apisixUpstreamController) handleSyncErr(obj interface{}, err error) {
+       if err == nil {
+               c.workqueue.Forget(obj)
+               return
+       }
+       if c.workqueue.NumRequeues(obj) < _maxRetries {
+               log.Infow("sync endpoints failed, will retry",
+                       zap.Any("object", obj),
+               )
+               c.workqueue.AddRateLimited(obj)
+       } else {
+               c.workqueue.Forget(obj)
+               log.Warnf("drop endpoints %+v out of the queue", obj)
+       }
 }
 
-func (c *ApisixUpstreamController) addFunc(obj interface{}) {
-       var key string
-       var err error
-       if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
-               runtime.HandleError(err)
+func (c *apisixUpstreamController) onAdd(obj interface{}) {
+       key, err := cache.MetaNamespaceKeyFunc(obj)
+       if err != nil {
+               log.Errorf("found ApisixUpstream resource with bad meta 
namesapce key: %s", err)
                return
        }
        if !c.controller.namespaceWatching(key) {
                return
        }
-       sqo := &UpstreamQueueObj{Key: key, OldObj: nil, Ope: ADD}
-       c.workqueue.AddRateLimited(sqo)
+       log.Debugw("ApisixUpstream add event arrived",
+               zap.Any("object", obj))
+
+       c.workqueue.AddRateLimited(&types.Event{
+               Type:   types.EventAdd,
+               Object: key,
+       })
 }
 
-func (c *ApisixUpstreamController) updateFunc(oldObj, newObj interface{}) {
-       oldUpstream := oldObj.(*configv1.ApisixUpstream)
-       newUpstream := newObj.(*configv1.ApisixUpstream)
-       if oldUpstream.ResourceVersion >= newUpstream.ResourceVersion {
+func (c *apisixUpstreamController) onUpdate(oldObj, newObj interface{}) {
+       prev := oldObj.(*configv1.ApisixUpstream)
+       curr := newObj.(*configv1.ApisixUpstream)
+       if prev.ResourceVersion >= curr.ResourceVersion {
                return
        }
-       var (
-               key string
-               err error
-       )
-       if key, err = cache.MetaNamespaceKeyFunc(newObj); err != nil {
-               runtime.HandleError(err)
+       key, err := cache.MetaNamespaceKeyFunc(newObj)
+       if err != nil {
+               log.Errorf("found ApisixUpstream resource with bad meta 
namespace key: %s", err)
                return
        }
-       sqo := &UpstreamQueueObj{Key: key, OldObj: oldUpstream, Ope: UPDATE}
-       c.addFunc(sqo)
+       log.Debugw("ApisixUpstream update event arrived",
+               zap.Any("new object", curr),
+               zap.Any("old object", prev),
+       )
+
+       c.workqueue.AddRateLimited(&types.Event{
+               Type:   types.EventUpdate,
+               Object: key,
+       })
 }
 
-func (c *ApisixUpstreamController) deleteFunc(obj interface{}) {
-       oldUpstream, ok := obj.(*configv1.ApisixUpstream)
+func (c *apisixUpstreamController) OnDelete(obj interface{}) {
+       au, ok := obj.(*configv1.ApisixUpstream)
        if !ok {
-               oldState, ok := obj.(cache.DeletedFinalStateUnknown)
-               if !ok {
-                       return
-               }
-               oldUpstream, ok = oldState.Obj.(*configv1.ApisixUpstream)
+               tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
                if !ok {
                        return
                }
+               au = tombstone.Obj.(*configv1.ApisixUpstream)
        }
-       var key string
-       var err error
-       key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
+
+       key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
        if err != nil {
-               runtime.HandleError(err)
+               log.Errorf("found ApisixUpstream resource with bad meta 
namesapce key: %s", err)
                return
        }
        if !c.controller.namespaceWatching(key) {
                return
        }
-       sqo := &UpstreamQueueObj{Key: key, OldObj: oldUpstream, Ope: DELETE}
-       c.workqueue.AddRateLimited(sqo)
+       log.Debugw("ApisixUpstream delete event arrived",
+               zap.Any("final state", au),
+       )
+       c.workqueue.AddRateLimited(&types.Event{
+               Type:      types.EventDelete,
+               Object:    key,
+               Tombstone: au,
+       })
 }
diff --git a/pkg/ingress/controller/controller.go 
b/pkg/ingress/controller/controller.go
index a08276d..6772637 100644
--- a/pkg/ingress/controller/controller.go
+++ b/pkg/ingress/controller/controller.go
@@ -20,6 +20,7 @@ import (
        "sync"
        "time"
 
+       listersv1 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
        "go.uber.org/zap"
        v1 "k8s.io/api/core/v1"
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -58,6 +59,7 @@ type Controller struct {
        wg                 sync.WaitGroup
        watchingNamespace  map[string]struct{}
        apisix             apisix.APISIX
+       translator         kube.Translator
        apiServer          *api.Server
        clientset          kubernetes.Interface
        crdClientset       crdclientset.Interface
@@ -65,11 +67,17 @@ type Controller struct {
        crdController      *Api6Controller
        crdInformerFactory externalversions.SharedInformerFactory
 
-       // informers and listers
-       epInformer cache.SharedIndexInformer
-       epLister   listerscorev1.EndpointsLister
-
-       endpointsController *endpointsController
+       // common informers and listers
+       epInformer             cache.SharedIndexInformer
+       epLister               listerscorev1.EndpointsLister
+       svcInformer            cache.SharedIndexInformer
+       svcLister              listerscorev1.ServiceLister
+       apisixUpstreamInformer cache.SharedIndexInformer
+       apisixUpstreamLister   listersv1.ApisixUpstreamLister
+
+       // resource conrollers
+       endpointsController      *endpointsController
+       apisixUpstreamController *apisixUpstreamController
 }
 
 // NewController creates an ingress apisix controller object.
@@ -118,11 +126,22 @@ func NewController(cfg *config.Config) (*Controller, 
error) {
                crdInformerFactory: sharedInformerFactory,
                watchingNamespace:  watchingNamespace,
 
-               epInformer: 
kube.CoreSharedInformerFactory.Core().V1().Endpoints().Informer(),
-               epLister:   
kube.CoreSharedInformerFactory.Core().V1().Endpoints().Lister(),
+               epInformer:             
kube.CoreSharedInformerFactory.Core().V1().Endpoints().Informer(),
+               epLister:               
kube.CoreSharedInformerFactory.Core().V1().Endpoints().Lister(),
+               svcInformer:            
kube.CoreSharedInformerFactory.Core().V1().Services().Informer(),
+               svcLister:              
kube.CoreSharedInformerFactory.Core().V1().Services().Lister(),
+               apisixUpstreamInformer: 
sharedInformerFactory.Apisix().V1().ApisixUpstreams().Informer(),
+               apisixUpstreamLister:   
sharedInformerFactory.Apisix().V1().ApisixUpstreams().Lister(),
        }
+       c.translator = kube.NewTranslator(&kube.TranslatorOptions{
+               EndpointsLister:      c.epLister,
+               ServiceLister:        c.svcLister,
+               ApisixUpstreamLister: c.apisixUpstreamLister,
+       })
+
+       c.endpointsController = c.newEndpointsController()
+       c.apisixUpstreamController = c.newApisixUpstreamController()
 
-       c.endpointsController = c.newEndpointsController(c.epInformer, 
c.epLister)
        return c, nil
 }
 
@@ -237,10 +256,16 @@ func (c *Controller) run(ctx context.Context) {
        c.goAttach(func() {
                c.epInformer.Run(ctx.Done())
        })
+       c.goAttach(func() {
+               c.svcInformer.Run(ctx.Done())
+       })
 
        c.goAttach(func() {
                c.endpointsController.run(ctx)
        })
+       c.goAttach(func() {
+               c.apisixUpstreamController.run(ctx)
+       })
 
        ac := &Api6Controller{
                KubeClientSet:             c.clientset,
@@ -252,8 +277,6 @@ func (c *Controller) run(ctx context.Context) {
 
        // ApisixRoute
        ac.ApisixRoute(c)
-       // ApisixUpstream
-       ac.ApisixUpstream(c)
        // ApisixTLS
        ac.ApisixTLS(c)
 
@@ -302,17 +325,6 @@ func (api6 *Api6Controller) ApisixRoute(controller 
*Controller) {
        }
 }
 
-func (api6 *Api6Controller) ApisixUpstream(controller *Controller) {
-       auc := BuildApisixUpstreamController(
-               api6.KubeClientSet,
-               api6.Api6ClientSet,
-               api6.SharedInformerFactory.Apisix().V1().ApisixUpstreams(),
-               controller)
-       if err := auc.Run(api6.Stop); err != nil {
-               log.Errorf("failed to run ApisixUpstreamController: %s", err)
-       }
-}
-
 func (api6 *Api6Controller) ApisixTLS(controller *Controller) {
        atc := BuildApisixTlsController(
                api6.KubeClientSet,
diff --git a/pkg/ingress/controller/endpoint.go 
b/pkg/ingress/controller/endpoint.go
index c2937d6..1406b72 100644
--- a/pkg/ingress/controller/endpoint.go
+++ b/pkg/ingress/controller/endpoint.go
@@ -16,11 +16,10 @@ package controller
 
 import (
        "context"
-       "fmt"
 
        "go.uber.org/zap"
        corev1 "k8s.io/api/core/v1"
-       listerscorev1 "k8s.io/client-go/listers/core/v1"
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/client-go/tools/cache"
        "k8s.io/client-go/util/workqueue"
 
@@ -33,29 +32,24 @@ import (
 )
 
 const (
-       _defaultNodeWeight = 100
        // maxRetries is the number of times an object will be retried before 
it is dropped out of the queue.
        _maxRetries = 10
 )
 
 type endpointsController struct {
        controller *Controller
-       informer   cache.SharedIndexInformer
-       lister     listerscorev1.EndpointsLister
        workqueue  workqueue.RateLimitingInterface
        workers    int
 }
 
-func (c *Controller) newEndpointsController(informer 
cache.SharedIndexInformer, lister listerscorev1.EndpointsLister) 
*endpointsController {
+func (c *Controller) newEndpointsController() *endpointsController {
        ctl := &endpointsController{
                controller: c,
-               informer:   informer,
-               lister:     lister,
                workqueue:  
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), 
"endpoints"),
                workers:    1,
        }
 
-       ctl.informer.AddEventHandler(
+       ctl.controller.epInformer.AddEventHandler(
                cache.ResourceEventHandlerFuncs{
                        AddFunc:    ctl.onAdd,
                        UpdateFunc: ctl.onUpdate,
@@ -70,7 +64,7 @@ func (c *endpointsController) run(ctx context.Context) {
        log.Info("endpoints controller started")
        defer log.Info("endpoints controller exited")
 
-       if ok := cache.WaitForCacheSync(ctx.Done(), c.informer.HasSynced); !ok {
+       if ok := cache.WaitForCacheSync(ctx.Done(), 
c.controller.epInformer.HasSynced); !ok {
                log.Error("informers sync failed")
                return
        }
@@ -98,17 +92,39 @@ func (c *endpointsController) run(ctx context.Context) {
 
 func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error 
{
        ep := ev.Object.(*corev1.Endpoints)
+       svc, err := c.controller.svcLister.Services(ep.Namespace).Get(ep.Name)
+       if err != nil {
+               if k8serrors.IsNotFound(err) {
+                       log.Warnf("service %s/%s was deleted", ep.Namespace, 
ep.Name)
+                       return nil
+               }
+               log.Errorf("failed to get service %s/%s: %s", ep.Namespace, 
ep.Name, err)
+               return err
+       }
+       portMap := make(map[string]int32)
+       for _, port := range svc.Spec.Ports {
+               portMap[port.Name] = port.Port
+       }
        clusters := c.controller.apisix.ListClusters()
        for _, s := range ep.Subsets {
                for _, port := range s.Ports {
-                       // FIXME this is wrong, we should use the port name as 
the key.
-                       upstream := fmt.Sprintf("%s_%s_%d", ep.Namespace, 
ep.Name, port.Port)
+                       svcPort, ok := portMap[port.Name]
+                       if !ok {
+                               // This shouldn't happen.
+                               log.Errorf("port %s in endpoints %s/%s but not 
in service", port.Name, ep.Namespace, ep.Name)
+                               continue
+                       }
+                       nodes, err := 
c.controller.translator.TranslateUpstreamNodes(ep, svcPort)
+                       if err != nil {
+                               log.Errorw("failed to translate upstream nodes",
+                                       zap.Error(err),
+                                       zap.Any("endpoints", ep),
+                                       zap.Int32("port", svcPort),
+                               )
+                       }
+                       name := apisixv1.ComposeUpstreamName(ep.Namespace, 
ep.Name, svcPort)
                        for _, cluster := range clusters {
-                               var addresses []corev1.EndpointAddress
-                               if ev.Type != types.EventDelete {
-                                       addresses = s.Addresses
-                               }
-                               if err := c.syncToCluster(ctx, upstream, 
cluster, addresses, int(port.Port)); err != nil {
+                               if err := c.syncToCluster(ctx, cluster, nodes, 
name); err != nil {
                                        return err
                                }
                        }
@@ -117,19 +133,18 @@ func (c *endpointsController) sync(ctx context.Context, 
ev *types.Event) error {
        return nil
 }
 
-func (c *endpointsController) syncToCluster(ctx context.Context, upstreamName 
string,
-       cluster apisix.Cluster, addresses []corev1.EndpointAddress, port int) 
error {
-       upstream, err := cluster.Upstream().Get(ctx, upstreamName)
+func (c *endpointsController) syncToCluster(ctx context.Context, cluster 
apisix.Cluster, nodes []apisixv1.UpstreamNode, upsName string) error {
+       upstream, err := cluster.Upstream().Get(ctx, upsName)
        if err != nil {
                if err == apisixcache.ErrNotFound {
                        log.Warnw("upstream is not referenced",
                                zap.String("cluster", cluster.String()),
-                               zap.String("upstream", upstreamName),
+                               zap.String("upstream", upsName),
                        )
                        return nil
                } else {
                        log.Errorw("failed to get upstream",
-                               zap.String("upstream", upstreamName),
+                               zap.String("upstream", upsName),
                                zap.String("cluster", cluster.String()),
                                zap.Error(err),
                        )
@@ -137,17 +152,10 @@ func (c *endpointsController) syncToCluster(ctx 
context.Context, upstreamName st
                }
        }
 
-       nodes := make([]apisixv1.Node, 0, len(addresses))
-       for _, address := range addresses {
-               nodes = append(nodes, apisixv1.Node{
-                       IP:     address.IP,
-                       Port:   port,
-                       Weight: _defaultNodeWeight,
-               })
-       }
        log.Debugw("upstream binds new nodes",
-               zap.String("upstream", upstreamName),
+               zap.String("upstream", upsName),
                zap.Any("nodes", nodes),
+               zap.String("cluster", cluster.String()),
        )
 
        upstream.Nodes = nodes
@@ -157,7 +165,7 @@ func (c *endpointsController) syncToCluster(ctx 
context.Context, upstreamName st
 
        if _, err = comb.Solver(); err != nil {
                log.Errorw("failed to sync upstream",
-                       zap.String("upstream", upstreamName),
+                       zap.String("upstream", upsName),
                        zap.String("cluster", cluster.String()),
                        zap.Error(err),
                )
diff --git a/pkg/ingress/endpoint/ep.go b/pkg/ingress/endpoint/ep.go
deleted file mode 100644
index 0ebcebc..0000000
--- a/pkg/ingress/endpoint/ep.go
+++ /dev/null
@@ -1,61 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package endpoint
-
-import (
-       "github.com/golang/glog"
-
-       "github.com/apache/apisix-ingress-controller/pkg/kube"
-       v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
-)
-
-type Endpoint interface {
-       BuildEps(ns, name string, port int) []v1.Node
-}
-
-type EndpointRequest struct{}
-
-func (epr *EndpointRequest) BuildEps(ns, name string, port int) []v1.Node {
-       nodes := make([]v1.Node, 0)
-       epInformers := kube.EndpointsInformer
-       if ep, err := epInformers.Lister().Endpoints(ns).Get(name); err != nil {
-               glog.Errorf("find endpoint %s/%s err: %s", ns, name, 
err.Error())
-       } else {
-               for _, s := range ep.Subsets {
-                       for _, ip := range s.Addresses {
-                               node := v1.Node{IP: ip.IP, Port: port, Weight: 
100}
-                               nodes = append(nodes, node)
-                       }
-               }
-       }
-       return nodes
-}
-
-// BuildEps build nodes from endpoints for upstream
-func BuildEps(ns, name string, port int) []v1.Node {
-       nodes := make([]v1.Node, 0)
-       epInformers := kube.EndpointsInformer
-       if ep, err := epInformers.Lister().Endpoints(ns).Get(name); err != nil {
-               glog.Errorf("find endpoint %s/%s err: %s", ns, name, 
err.Error())
-       } else {
-               for _, s := range ep.Subsets {
-                       for _, ip := range s.Addresses {
-                               node := v1.Node{IP: ip.IP, Port: port, Weight: 
100}
-                               nodes = append(nodes, node)
-                       }
-               }
-       }
-       return nodes
-}
diff --git a/pkg/kube/apisix/apis/config/v1/types.go 
b/pkg/kube/apisix/apis/config/v1/types.go
index ccc9fd0..b9648ea 100644
--- a/pkg/kube/apisix/apis/config/v1/types.go
+++ b/pkg/kube/apisix/apis/config/v1/types.go
@@ -74,35 +74,46 @@ type ApisixRouteList struct {
 // +genclient:noStatus
 
 // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
-// ApisixUpstream is used to decorate Upstream in APISIX, such as load
-// balacing type.
+// ApisixUpstream is a decorator for Kubernetes Service, it arms the Service
+// with rich features like health check, retry policies, load balancer and 
others.
+// It's designed to have same name with the Kubernetes Service and can be 
customized
+// for individual port.
 type ApisixUpstream struct {
        metav1.TypeMeta   `json:",inline" yaml:",inline"`
        metav1.ObjectMeta `json:"metadata,omitempty" yaml:"metadata,omitempty"`
-       Spec              *ApisixUpstreamSpec `json:"spec,omitempty" 
yaml:"spec,omitempty"`
+
+       Spec *ApisixUpstreamSpec `json:"spec,omitempty" yaml:"spec,omitempty"`
 }
 
-// ApisixUpstreamSpec describes the specification of Upstream in APISIX.
+// ApisixUpstreamSpec describes the specification of ApisixUpstream.
 type ApisixUpstreamSpec struct {
-       Ports []Port `json:"ports,omitempty"`
+       ApisixUpstreamConfig `json:",inline" yaml:",inline"`
+
+       PortLevelSettings []PortLevelSettings 
`json:"portLevelSettings,omitempty" yaml:"portLevelSettings,omitempty"`
 }
 
-// Port is the port-specific configurations.
-type Port struct {
-       Port         int           `json:"port,omitempty"`
-       LoadBalancer *LoadBalancer `json:"loadbalancer,omitempty"`
+// ApisixUpstreamConfig contains rich features on APISIX Upstream, for instance
+// load balancer, health check and etc.
+type ApisixUpstreamConfig struct {
+       // LoadBalancer represents the load balancer configuration for 
Kubernetes Service.
+       // The default strategy is round robin.
+       // +optional
+       LoadBalancer *LoadBalancer `json:"loadbalancer,omitempty" 
yaml:"loadbalancer,omitempty"`
        // The scheme used to talk with the upstream.
        // Now value can be http, grpc.
        // +optional
        Scheme string `json:"scheme,omitempty" yaml:"scheme,omitempty"`
 }
 
-var (
-       // SchemeHTTP represents the HTTP protocol.
-       SchemeHTTP = "http"
-       // SchemeGRPC represents the GRPC protocol.
-       SchemeGRPC = "grpc"
-)
+// PortLevelSettings configures the ApisixUpstreamConfig for each individual 
port. It inherits
+// configurations from the outer level (the whole Kubernetes Service) and 
overrides some of
+// them if they are set on the port level.
+type PortLevelSettings struct {
+       ApisixUpstreamConfig `json:",inline" yaml:",inline"`
+
+       // Port is a Kubernetes Service port, it should be already defined.
+       Port int32 `json:"port" yaml:"port"`
+}
 
 // LoadBalancer describes the load balancing parameters.
 type LoadBalancer struct {
diff --git a/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go 
b/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go
index 5ec8da8..82ccbb5 100644
--- a/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go
+++ b/pkg/kube/apisix/apis/config/v1/zz_generated.deepcopy.go
@@ -245,6 +245,27 @@ func (in *ApisixUpstream) DeepCopyObject() runtime.Object {
 }
 
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *ApisixUpstreamConfig) DeepCopyInto(out *ApisixUpstreamConfig) {
+       *out = *in
+       if in.LoadBalancer != nil {
+               in, out := &in.LoadBalancer, &out.LoadBalancer
+               *out = new(LoadBalancer)
+               **out = **in
+       }
+       return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new ApisixUpstreamConfig.
+func (in *ApisixUpstreamConfig) DeepCopy() *ApisixUpstreamConfig {
+       if in == nil {
+               return nil
+       }
+       out := new(ApisixUpstreamConfig)
+       in.DeepCopyInto(out)
+       return out
+}
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *ApisixUpstreamList) DeepCopyInto(out *ApisixUpstreamList) {
        *out = *in
        out.TypeMeta = in.TypeMeta
@@ -280,9 +301,10 @@ func (in *ApisixUpstreamList) DeepCopyObject() 
runtime.Object {
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *ApisixUpstreamSpec) DeepCopyInto(out *ApisixUpstreamSpec) {
        *out = *in
-       if in.Ports != nil {
-               in, out := &in.Ports, &out.Ports
-               *out = make([]Port, len(*in))
+       in.ApisixUpstreamConfig.DeepCopyInto(&out.ApisixUpstreamConfig)
+       if in.PortLevelSettings != nil {
+               in, out := &in.PortLevelSettings, &out.PortLevelSettings
+               *out = make([]PortLevelSettings, len(*in))
                for i := range *in {
                        (*in)[i].DeepCopyInto(&(*out)[i])
                }
@@ -398,22 +420,18 @@ func (in *Plugin) DeepCopy() *Plugin {
 }
 
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
-func (in *Port) DeepCopyInto(out *Port) {
+func (in *PortLevelSettings) DeepCopyInto(out *PortLevelSettings) {
        *out = *in
-       if in.LoadBalancer != nil {
-               in, out := &in.LoadBalancer, &out.LoadBalancer
-               *out = new(LoadBalancer)
-               **out = **in
-       }
+       in.ApisixUpstreamConfig.DeepCopyInto(&out.ApisixUpstreamConfig)
        return
 }
 
-// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new Port.
-func (in *Port) DeepCopy() *Port {
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new PortLevelSettings.
+func (in *PortLevelSettings) DeepCopy() *PortLevelSettings {
        if in == nil {
                return nil
        }
-       out := new(Port)
+       out := new(PortLevelSettings)
        in.DeepCopyInto(out)
        return out
 }
diff --git a/pkg/kube/translator.go b/pkg/kube/translator.go
new file mode 100644
index 0000000..226bccd
--- /dev/null
+++ b/pkg/kube/translator.go
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package kube
+
+import (
+       "fmt"
+
+       corev1 "k8s.io/api/core/v1"
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
+       listerscorev1 "k8s.io/client-go/listers/core/v1"
+
+       configv1 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
+       listersv1 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/client/listers/config/v1"
+       apisixv1 
"github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+)
+
+const (
+       _defaultWeight = 100
+)
+
+type translateError struct {
+       field  string
+       reason string
+}
+
+func (te *translateError) Error() string {
+       return fmt.Sprintf("%s: %s", te.field, te.reason)
+}
+
+// Translator translates Apisix* CRD resources to the description in APISIX.
+type Translator interface {
+       // TranslateUpstreamNodes translate Endpoints resources to APISIX 
Upstream nodes
+       // according to the give port.
+       TranslateUpstreamNodes(*corev1.Endpoints, int32) 
([]apisixv1.UpstreamNode, error)
+       // TranslateUpstreamConfig translates ApisixUpstreamConfig (part of 
ApisixUpstream)
+       // to APISIX Upstream, it doesn't fill the the Upstream metadata and 
nodes.
+       TranslateUpstreamConfig(config *configv1.ApisixUpstreamConfig) 
(*apisixv1.Upstream, error)
+       // TranslateUpstream composes an upstream according to the
+       // given namespace, name (searching Service/Endpoints) and port 
(filtering Endpoints).
+       // The returned Upstream doesn't have metadata info.
+       TranslateUpstream(string, string, int32) (*apisixv1.Upstream, error)
+}
+
+// TranslatorOptions contains options to help Translator
+// work well.
+type TranslatorOptions struct {
+       EndpointsLister      listerscorev1.EndpointsLister
+       ServiceLister        listerscorev1.ServiceLister
+       ApisixUpstreamLister listersv1.ApisixUpstreamLister
+}
+
+type translator struct {
+       *TranslatorOptions
+}
+
+// NewTranslator initializes a APISIX CRD resources Translator.
+func NewTranslator(opts *TranslatorOptions) Translator {
+       return &translator{
+               TranslatorOptions: opts,
+       }
+}
+
+func (t *translator) TranslateUpstreamConfig(au 
*configv1.ApisixUpstreamConfig) (*apisixv1.Upstream, error) {
+       ups := apisixv1.NewDefaultUpstream()
+
+       if au.Scheme == "" {
+               au.Scheme = apisixv1.SchemeHTTP
+       } else {
+               switch au.Scheme {
+               case apisixv1.SchemeHTTP, apisixv1.SchemeGRPC:
+                       ups.Scheme = au.Scheme
+               default:
+                       return nil, &translateError{field: "scheme", reason: 
"invalid value"}
+               }
+       }
+
+       if au.LoadBalancer == nil || au.LoadBalancer.Type == "" {
+               ups.Type = apisixv1.LbRoundRobin
+       } else {
+               switch au.LoadBalancer.Type {
+               case apisixv1.LbRoundRobin, apisixv1.LbLeastConn, 
apisixv1.LbEwma:
+                       ups.Type = au.LoadBalancer.Type
+               case apisixv1.LbConsistentHash:
+                       ups.Type = au.LoadBalancer.Type
+                       ups.Key = au.LoadBalancer.Key
+                       switch au.LoadBalancer.HashOn {
+                       case apisixv1.HashOnVars:
+                               fallthrough
+                       case apisixv1.HashOnHeader:
+                               fallthrough
+                       case apisixv1.HashOnCookie:
+                               fallthrough
+                       case apisixv1.HashOnConsumer:
+                               fallthrough
+                       case apisixv1.HashOnVarsCombination:
+                               ups.HashOn = au.LoadBalancer.HashOn
+                       default:
+                               return nil, &translateError{field: 
"loadbalancer.hashOn", reason: "invalid value"}
+                       }
+               default:
+                       return nil, &translateError{
+                               field:  "loadbalancer.type",
+                               reason: "invalid value",
+                       }
+               }
+       }
+       return ups, nil
+}
+
+func (t *translator) TranslateUpstream(namespace, name string, port int32) 
(*apisixv1.Upstream, error) {
+       endpoints, err := t.EndpointsLister.Endpoints(namespace).Get(name)
+       if err != nil {
+               return nil, &translateError{
+                       field:  "endpoints",
+                       reason: err.Error(),
+               }
+       }
+       nodes, err := t.TranslateUpstreamNodes(endpoints, port)
+       if err != nil {
+               return nil, err
+       }
+       ups := apisixv1.NewDefaultUpstream()
+       au, err := t.ApisixUpstreamLister.ApisixUpstreams(namespace).Get(name)
+       if err != nil {
+               if k8serrors.IsNotFound(err) {
+                       ups.Nodes = nodes
+                       return ups, nil
+               }
+               return nil, &translateError{
+                       field:  "ApisixUpstream",
+                       reason: err.Error(),
+               }
+       }
+       upsCfg := &au.Spec.ApisixUpstreamConfig
+       for _, pls := range au.Spec.PortLevelSettings {
+               if pls.Port == port {
+                       upsCfg = &pls.ApisixUpstreamConfig
+                       break
+               }
+       }
+       ups, err = t.TranslateUpstreamConfig(upsCfg)
+       if err != nil {
+               return nil, err
+       }
+       ups.Nodes = nodes
+       return ups, nil
+}
+
+func (t *translator) TranslateUpstreamNodes(endpoints *corev1.Endpoints, port 
int32) ([]apisixv1.UpstreamNode, error) {
+       svc, err := 
t.ServiceLister.Services(endpoints.Namespace).Get(endpoints.Name)
+       if err != nil {
+               return nil, &translateError{
+                       field:  "service",
+                       reason: err.Error(),
+               }
+       }
+
+       var svcPort *corev1.ServicePort
+       for _, exposePort := range svc.Spec.Ports {
+               if exposePort.Port == port {
+                       svcPort = &exposePort
+                       break
+               }
+       }
+       if svcPort == nil {
+               return nil, &translateError{
+                       field:  "service.spec.ports",
+                       reason: "port not defined",
+               }
+       }
+       var nodes []apisixv1.UpstreamNode
+       for _, subset := range endpoints.Subsets {
+               var epPort *corev1.EndpointPort
+               for _, port := range subset.Ports {
+                       if port.Name == svcPort.Name {
+                               epPort = &port
+                               break
+                       }
+               }
+               if epPort != nil {
+                       for _, addr := range subset.Addresses {
+                               nodes = append(nodes, apisixv1.UpstreamNode{
+                                       IP:   addr.IP,
+                                       Port: int(epPort.Port),
+                                       // FIXME Custom node weight
+                                       Weight: _defaultWeight,
+                               })
+                       }
+               }
+       }
+       return nodes, nil
+}
diff --git a/pkg/kube/translator_test.go b/pkg/kube/translator_test.go
new file mode 100644
index 0000000..743fa7d
--- /dev/null
+++ b/pkg/kube/translator_test.go
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package kube
+
+import (
+       "context"
+       "testing"
+
+       apisixv1 
"github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
+       "github.com/stretchr/testify/assert"
+       corev1 "k8s.io/api/core/v1"
+       metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+       "k8s.io/apimachinery/pkg/util/intstr"
+       "k8s.io/client-go/informers"
+       "k8s.io/client-go/kubernetes/fake"
+       "k8s.io/client-go/tools/cache"
+
+       configv1 
"github.com/apache/apisix-ingress-controller/pkg/kube/apisix/apis/config/v1"
+)
+
+func TestTranslateUpstreamConfig(t *testing.T) {
+       tr := &translator{}
+
+       au := &configv1.ApisixUpstreamConfig{
+               LoadBalancer: nil,
+               Scheme:       apisixv1.SchemeGRPC,
+       }
+
+       ups, err := tr.TranslateUpstreamConfig(au)
+       assert.Nil(t, err, "checking upstream config translating")
+       assert.Equal(t, ups.Type, apisixv1.LbRoundRobin)
+       assert.Equal(t, ups.Scheme, apisixv1.SchemeGRPC)
+
+       au = &configv1.ApisixUpstreamConfig{
+               LoadBalancer: &configv1.LoadBalancer{
+                       Type:   apisixv1.LbConsistentHash,
+                       HashOn: apisixv1.HashOnHeader,
+                       Key:    "user-agent",
+               },
+               Scheme: apisixv1.SchemeHTTP,
+       }
+       ups, err = tr.TranslateUpstreamConfig(au)
+       assert.Nil(t, err, "checking upstream config translating")
+       assert.Equal(t, ups.Type, apisixv1.LbConsistentHash)
+       assert.Equal(t, ups.Key, "user-agent")
+       assert.Equal(t, ups.HashOn, apisixv1.HashOnHeader)
+       assert.Equal(t, ups.Scheme, apisixv1.SchemeHTTP)
+
+       au = &configv1.ApisixUpstreamConfig{
+               LoadBalancer: &configv1.LoadBalancer{
+                       Type:   apisixv1.LbConsistentHash,
+                       HashOn: apisixv1.HashOnHeader,
+                       Key:    "user-agent",
+               },
+               Scheme: "dns",
+       }
+       _, err = tr.TranslateUpstreamConfig(au)
+       assert.Error(t, err, &translateError{
+               field:  "scheme",
+               reason: "invalid value",
+       })
+
+       au = &configv1.ApisixUpstreamConfig{
+               LoadBalancer: &configv1.LoadBalancer{
+                       Type: "hash",
+               },
+       }
+       _, err = tr.TranslateUpstreamConfig(au)
+       assert.Error(t, err, &translateError{
+               field:  "loadbalancer.type",
+               reason: "invalid value",
+       })
+
+       au = &configv1.ApisixUpstreamConfig{
+               LoadBalancer: &configv1.LoadBalancer{
+                       Type:   apisixv1.LbConsistentHash,
+                       HashOn: "arg",
+               },
+       }
+       _, err = tr.TranslateUpstreamConfig(au)
+       assert.Error(t, err, &translateError{
+               field:  "loadbalancer.hashOn",
+               reason: "invalid value",
+       })
+}
+
+func TestTranslateUpstreamNodes(t *testing.T) {
+       svc := &corev1.Service{
+               TypeMeta: metav1.TypeMeta{},
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      "svc",
+                       Namespace: "test",
+               },
+               Spec: corev1.ServiceSpec{
+                       Ports: []corev1.ServicePort{
+                               {
+                                       Name: "port1",
+                                       Port: 80,
+                                       TargetPort: intstr.IntOrString{
+                                               Type:   intstr.Int,
+                                               IntVal: 9080,
+                                       },
+                               },
+                               {
+                                       Name: "port2",
+                                       Port: 443,
+                                       TargetPort: intstr.IntOrString{
+                                               Type:   intstr.Int,
+                                               IntVal: 9443,
+                                       },
+                               },
+                       },
+               },
+       }
+       endpoints := &corev1.Endpoints{
+               TypeMeta: metav1.TypeMeta{},
+               ObjectMeta: metav1.ObjectMeta{
+                       Name:      "svc",
+                       Namespace: "test",
+               },
+               Subsets: []corev1.EndpointSubset{
+                       {
+                               Ports: []corev1.EndpointPort{
+                                       {
+                                               Name: "port1",
+                                               Port: 9080,
+                                       },
+                                       {
+                                               Name: "port2",
+                                               Port: 9443,
+                                       },
+                               },
+                               Addresses: []corev1.EndpointAddress{
+                                       {IP: "192.168.1.1"},
+                                       {IP: "192.168.1.2"},
+                               },
+                       },
+               },
+       }
+
+       client := fake.NewSimpleClientset()
+       informersFactory := informers.NewSharedInformerFactory(client, 0)
+       svcInformer := informersFactory.Core().V1().Services().Informer()
+       svcLister := informersFactory.Core().V1().Services().Lister()
+
+       processCh := make(chan struct{})
+       svcInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
+               AddFunc: func(obj interface{}) {
+                       processCh <- struct{}{}
+               },
+       })
+
+       stopCh := make(chan struct{})
+       defer close(stopCh)
+       go svcInformer.Run(stopCh)
+       cache.WaitForCacheSync(stopCh, svcInformer.HasSynced)
+
+       _, err := client.CoreV1().Services("test").Create(context.Background(), 
svc, metav1.CreateOptions{})
+       assert.Nil(t, err)
+
+       tr := &translator{&TranslatorOptions{
+               ServiceLister: svcLister,
+       }}
+       <-processCh
+
+       nodes, err := tr.TranslateUpstreamNodes(endpoints, 10080)
+       assert.Nil(t, nodes)
+       assert.Equal(t, err, &translateError{
+               field:  "service.spec.ports",
+               reason: "port not defined",
+       })
+
+       nodes, err = tr.TranslateUpstreamNodes(endpoints, 80)
+       assert.Nil(t, err)
+       assert.Equal(t, nodes, []apisixv1.UpstreamNode{
+               {
+                       IP:     "192.168.1.1",
+                       Port:   9080,
+                       Weight: 100,
+               },
+               {
+                       IP:     "192.168.1.2",
+                       Port:   9080,
+                       Weight: 100,
+               },
+       })
+
+       nodes, err = tr.TranslateUpstreamNodes(endpoints, 443)
+       assert.Nil(t, err)
+       assert.Equal(t, nodes, []apisixv1.UpstreamNode{
+               {
+                       IP:     "192.168.1.1",
+                       Port:   9443,
+                       Weight: 100,
+               },
+               {
+                       IP:     "192.168.1.2",
+                       Port:   9443,
+                       Weight: 100,
+               },
+       })
+}
diff --git a/pkg/types/apisix/v1/types.go b/pkg/types/apisix/v1/types.go
index a249ed2..549e39f 100644
--- a/pkg/types/apisix/v1/types.go
+++ b/pkg/types/apisix/v1/types.go
@@ -15,7 +15,9 @@
 package v1
 
 import (
+       "bytes"
        "encoding/json"
+       "strconv"
 )
 
 const (
@@ -40,6 +42,11 @@ const (
        LbEwma = "ewma"
        // LbLeaseConn is the least connection load balancer.
        LbLeastConn = "least_conn"
+
+       // SchemeHTTP represents the HTTP protocol.
+       SchemeHTTP = "http"
+       // SchemeGRPC represents the GRPC protocol.
+       SchemeGRPC = "grpc"
 )
 
 // Metadata contains all meta information about resources.
@@ -101,17 +108,17 @@ type Service struct {
 type Upstream struct {
        Metadata `json:",inline" yaml:",inline"`
 
-       Type     string `json:"type,omitempty" yaml:"type,omitempty"`
-       HashOn   string `json:"hash_on,omitemtpy" yaml:"hash_on,omitempty"`
-       Key      string `json:"key,omitempty" yaml:"key,omitempty"`
-       Nodes    []Node `json:"nodes,omitempty" yaml:"nodes,omitempty"`
-       FromKind string `json:"from_kind,omitempty" yaml:"from_kind,omitempty"`
-       Scheme   string `json:"scheme,omitempty" yaml:"scheme,omitempty"`
+       Type     string         `json:"type,omitempty" yaml:"type,omitempty"`
+       HashOn   string         `json:"hash_on,omitemtpy" 
yaml:"hash_on,omitempty"`
+       Key      string         `json:"key,omitempty" yaml:"key,omitempty"`
+       Nodes    []UpstreamNode `json:"nodes,omitempty" yaml:"nodes,omitempty"`
+       FromKind string         `json:"from_kind,omitempty" 
yaml:"from_kind,omitempty"`
+       Scheme   string         `json:"scheme,omitempty" 
yaml:"scheme,omitempty"`
 }
 
 // Node the node in upstream
 // +k8s:deepcopy-gen=true
-type Node struct {
+type UpstreamNode struct {
        IP     string `json:"ip,omitempty" yaml:"ip,omitempty"`
        Port   int    `json:"port,omitempty" yaml:"port,omitempty"`
        Weight int    `json:"weight,omitempty" yaml:"weight,omitempty"`
@@ -128,3 +135,32 @@ type Ssl struct {
        Status   int      `json:"status,omitempty" yaml:"status,omitempty"`
        Group    string   `json:"group,omitempty" yaml:"group,omitempty"`
 }
+
+// NewDefaultUpstream returns an empty Upstream with default values.
+func NewDefaultUpstream() *Upstream {
+       return &Upstream{
+               Type:     LbRoundRobin,
+               Key:      "",
+               Nodes:    nil,
+               FromKind: "",
+               Scheme:   SchemeHTTP,
+       }
+}
+
+// ComposeUpstreamName uses namespace, name and port info to compose
+// the upstream name.
+func ComposeUpstreamName(namespace, name string, port int32) string {
+       pstr := strconv.Itoa(int(port))
+       // FIXME Use sync.Pool to reuse this buffer if the upstream
+       // name composing code path is hot.
+       p := make([]byte, 0, len(namespace)+len(name)+len(pstr)+2)
+       buf := bytes.NewBuffer(p)
+
+       buf.WriteString(namespace)
+       buf.WriteByte('_')
+       buf.WriteString(name)
+       buf.WriteByte('_')
+       buf.WriteString(pstr)
+
+       return buf.String()
+}
diff --git a/pkg/types/apisix/v1/zz_generated.deepcopy.go 
b/pkg/types/apisix/v1/zz_generated.deepcopy.go
index d8d239b..78ee480 100644
--- a/pkg/types/apisix/v1/zz_generated.deepcopy.go
+++ b/pkg/types/apisix/v1/zz_generated.deepcopy.go
@@ -21,22 +21,6 @@ limitations under the License.
 package v1
 
 // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
-func (in *Node) DeepCopyInto(out *Node) {
-       *out = *in
-       return
-}
-
-// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new Node.
-func (in *Node) DeepCopy() *Node {
-       if in == nil {
-               return nil
-       }
-       out := new(Node)
-       in.DeepCopyInto(out)
-       return out
-}
-
-// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
 func (in *Route) DeepCopyInto(out *Route) {
        *out = *in
        out.Metadata = in.Metadata
@@ -103,7 +87,7 @@ func (in *Upstream) DeepCopyInto(out *Upstream) {
        out.Metadata = in.Metadata
        if in.Nodes != nil {
                in, out := &in.Nodes, &out.Nodes
-               *out = make([]Node, len(*in))
+               *out = make([]UpstreamNode, len(*in))
                copy(*out, *in)
        }
        return
@@ -118,3 +102,19 @@ func (in *Upstream) DeepCopy() *Upstream {
        in.DeepCopyInto(out)
        return out
 }
+
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, 
writing into out. in must be non-nil.
+func (in *UpstreamNode) DeepCopyInto(out *UpstreamNode) {
+       *out = *in
+       return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, 
creating a new UpstreamNode.
+func (in *UpstreamNode) DeepCopy() *UpstreamNode {
+       if in == nil {
+               return nil
+       }
+       out := new(UpstreamNode)
+       in.DeepCopyInto(out)
+       return out
+}
diff --git a/pkg/types/event.go b/pkg/types/event.go
index 7535ba5..1d68553 100644
--- a/pkg/types/event.go
+++ b/pkg/types/event.go
@@ -42,6 +42,11 @@ func (ev EventType) String() string {
 
 // Event represents a typed event.
 type Event struct {
-       Type   EventType
+       // Type is the type of event.
+       Type EventType
+       // Object is the event subject.
        Object interface{}
+       // Tombstone is the final state before object was delete,
+       // it's useful for DELETE event.
+       Tombstone interface{}
 }
diff --git a/test/e2e/endpoints/endpoints.go b/test/e2e/endpoints/endpoints.go
index 16e30cf..86a1ff2 100644
--- a/test/e2e/endpoints/endpoints.go
+++ b/test/e2e/endpoints/endpoints.go
@@ -26,26 +26,31 @@ import (
 
 var _ = ginkgo.Describe("endpoints", func() {
        s := scaffold.NewDefaultScaffold()
-       ginkgo.It("ignore applied only if there is an ApisixUpstream 
referenced", func() {
+       ginkgo.It("ignore applied only if there is an ApisixRoute referenced", 
func() {
                time.Sleep(5 * time.Second)
                assert.Nil(ginkgo.GinkgoT(), 
s.EnsureNumApisixUpstreamsCreated(0), "checking number of upstreams")
                backendSvc, backendSvcPort := s.DefaultHTTPBackend()
                ups := fmt.Sprintf(`
 apiVersion: apisix.apache.org/v1
-kind: ApisixUpstream
+kind: ApisixRoute
 metadata:
-  name: %s
+  name: httpbin-route
 spec:
-  ports:
-    - port: %d
-      loadbalancer:
-        type: roundrobin
+   rules:
+   - host: httpbin.org
+     http:
+       paths:
+       - backend:
+           serviceName: %s
+           servicePort: %d
+         path: /ip
 `, backendSvc, backendSvcPort[0])
                assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(ups))
                assert.Nil(ginkgo.GinkgoT(), 
s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams")
        })
 
        ginkgo.It("upstream nodes should be reset to empty when 
Service/Endpoints was deleted", func() {
+               ginkgo.Skip("now we don't handle endpoints delete event")
                backendSvc, backendSvcPort := s.DefaultHTTPBackend()
                apisixRoute := fmt.Sprintf(`
 apiVersion: apisix.apache.org/v1
@@ -72,3 +77,54 @@ spec:
                s.NewAPISIXClient().GET("/ip").WithHeader("Host", 
"httpbin.com").Expect().Status(http.StatusServiceUnavailable)
        })
 })
+
+var _ = ginkgo.Describe("port usage", func() {
+       s := scaffold.NewScaffold(&scaffold.Options{
+               Name:                    "endpoints-port",
+               Kubeconfig:              scaffold.GetKubeconfig(),
+               APISIXConfigPath:        "testdata/apisix-gw-config.yaml",
+               APISIXDefaultConfigPath: 
"testdata/apisix-gw-config-default.yaml",
+               IngressAPISIXReplicas:   1,
+               HTTPBinServicePort:      8080, // use service port which is 
different from target port (80)
+       })
+       ginkgo.It("service port != target port", func() {
+               backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+               apisixRoute := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixRoute
+metadata:
+ name: httpbin-route
+spec:
+ rules:
+ - host: httpbin.com
+   http:
+     paths:
+     - backend:
+         serviceName: %s
+         servicePort: %d
+       path: /ip
+`, backendSvc, backendSvcPort[0])
+               assert.Nil(ginkgo.GinkgoT(), 
s.CreateResourceFromString(apisixRoute))
+               time.Sleep(3 * time.Second)
+               ups, err := s.ListApisixUpstreams()
+               assert.Nil(ginkgo.GinkgoT(), err, "listing APISIX upstreams")
+               assert.Len(ginkgo.GinkgoT(), ups, 1)
+               assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 1)
+
+               // port in nodes is still the targetPort, not the service port
+               assert.Equal(ginkgo.GinkgoT(), ups[0].Nodes[0].Port, 80)
+
+               // scale HTTPBIN, so the endpoints controller has the 
opportunity to update upstream.
+               assert.Nil(ginkgo.GinkgoT(), s.ScaleHTTPBIN(3))
+               time.Sleep(10 * time.Second)
+               ups, err = s.ListApisixUpstreams()
+               assert.Nil(ginkgo.GinkgoT(), err, "listing APISIX upstreams")
+               assert.Len(ginkgo.GinkgoT(), ups, 1)
+               assert.Len(ginkgo.GinkgoT(), ups[0].Nodes, 3)
+
+               // port in nodes is still the targetPort, not the service port
+               assert.Equal(ginkgo.GinkgoT(), ups[0].Nodes[0].Port, 80)
+               assert.Equal(ginkgo.GinkgoT(), ups[0].Nodes[1].Port, 80)
+               assert.Equal(ginkgo.GinkgoT(), ups[0].Nodes[2].Port, 80)
+       })
+})
diff --git a/test/e2e/features/scheme.go b/test/e2e/features/scheme.go
index 6e50e2c..bdcd529 100644
--- a/test/e2e/features/scheme.go
+++ b/test/e2e/features/scheme.go
@@ -59,15 +59,10 @@ kind: ApisixUpstream
 metadata:
   name: grpc-server-service
 spec:
-  ports:
+  portLevelSettings:
     - port: 50051
       scheme: grpc
 `))
-               time.Sleep(2 * time.Second)
-               ups, err := s.ListApisixUpstreams()
-               assert.Nil(ginkgo.GinkgoT(), err)
-               assert.Len(ginkgo.GinkgoT(), ups, 1)
-
                err = s.CreateResourceFromString(`
 apiVersion: apisix.apache.org/v1
 kind: ApisixRoute
@@ -84,6 +79,11 @@ spec:
        path: /helloworld.Greeter/SayHello
 `)
                assert.Nil(ginkgo.GinkgoT(), err)
+               time.Sleep(2 * time.Second)
+               ups, err := s.ListApisixUpstreams()
+               assert.Nil(ginkgo.GinkgoT(), err)
+               assert.Len(ginkgo.GinkgoT(), ups, 1)
+               assert.Equal(ginkgo.GinkgoT(), ups[0].Scheme, "grpc")
 
                // TODO enable the following test cases once APISIX supports 
HTTP/2 in plain.
                //ep, err := s.GetAPISIXEndpoint()
diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go
index ecdb83f..5ab61c8 100644
--- a/test/e2e/scaffold/scaffold.go
+++ b/test/e2e/scaffold/scaffold.go
@@ -45,6 +45,7 @@ type Options struct {
        APISIXConfigPath        string
        APISIXDefaultConfigPath string
        IngressAPISIXReplicas   int
+       HTTPBinServicePort      int
 }
 
 type Scaffold struct {
@@ -106,6 +107,7 @@ func NewDefaultScaffold() *Scaffold {
                APISIXConfigPath:        "testdata/apisix-gw-config.yaml",
                APISIXDefaultConfigPath: 
"testdata/apisix-gw-config-default.yaml",
                IngressAPISIXReplicas:   1,
+               HTTPBinServicePort:      80,
        }
        return NewScaffold(opts)
 }

Reply via email to