This is an automated email from the ASF dual-hosted git repository.

tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git


The following commit(s) were added to refs/heads/master by this push:
     new ea3543b  optimize endpoints controller (#206)
ea3543b is described below

commit ea3543b7179b06a9fd6c4bf0e36286c7763a9129
Author: Alex Zhang <[email protected]>
AuthorDate: Tue Jan 26 14:19:18 2021 +0800

    optimize endpoints controller (#206)
---
 pkg/ingress/controller/controller.go |  25 ++-
 pkg/ingress/controller/endpoint.go   | 300 ++++++++++++++++-------------------
 pkg/ingress/controller/watch.go      | 103 ------------
 test/e2e/e2e.go                      |   1 +
 test/e2e/endpoints/endpoints.go      |  46 ++++++
 test/e2e/scaffold/apisix.go          |   5 +
 test/e2e/scaffold/etcd.go            |   5 +
 test/e2e/scaffold/httpbin.go         |   5 +
 test/e2e/scaffold/ingress.go         |   5 +
 9 files changed, 218 insertions(+), 277 deletions(-)

diff --git a/pkg/ingress/controller/controller.go 
b/pkg/ingress/controller/controller.go
index 2669638..4b86a60 100644
--- a/pkg/ingress/controller/controller.go
+++ b/pkg/ingress/controller/controller.go
@@ -63,6 +63,8 @@ type Controller struct {
        metricsCollector   metrics.Collector
        crdController      *Api6Controller
        crdInformerFactory externalversions.SharedInformerFactory
+
+       endpointsController *endpointsController
 }
 
 // NewController creates an ingress apisix controller object.
@@ -97,6 +99,7 @@ func NewController(cfg *config.Config) (*Controller, error) {
                        watchingNamespace[ns] = struct{}{}
                }
        }
+       kube.EndpointsInformer = 
kube.CoreSharedInformerFactory.Core().V1().Endpoints()
 
        c := &Controller{
                name:               podName,
@@ -111,6 +114,7 @@ func NewController(cfg *config.Config) (*Controller, error) 
{
                watchingNamespace:  watchingNamespace,
        }
 
+       c.endpointsController = 
c.newEndpointsController(kube.CoreSharedInformerFactory)
        return c, nil
 }
 
@@ -221,6 +225,12 @@ func (c *Controller) run(ctx context.Context) {
                return
        }
 
+       c.goAttach(func() {
+               if err := c.endpointsController.run(ctx); err != nil {
+                       log.Errorf("failed to run endpoints controller: %s", 
err.Error())
+               }
+       })
+
        ac := &Api6Controller{
                KubeClientSet:             c.clientset,
                Api6ClientSet:             c.crdClientset,
@@ -228,13 +238,6 @@ func (c *Controller) run(ctx context.Context) {
                CoreSharedInformerFactory: kube.CoreSharedInformerFactory,
                Stop:                      ctx.Done(),
        }
-       epInformer := ac.CoreSharedInformerFactory.Core().V1().Endpoints()
-       kube.EndpointsInformer = epInformer
-       // endpoint
-       ac.Endpoint(c)
-       c.goAttach(func() {
-               ac.CoreSharedInformerFactory.Start(ctx.Done())
-       })
 
        // ApisixRoute
        ac.ApisixRoute(c)
@@ -322,11 +325,3 @@ func (api6 *Api6Controller) ApisixTLS(controller 
*Controller) {
                log.Errorf("failed to run ApisixTlsController: %s", err)
        }
 }
-
-func (api6 *Api6Controller) Endpoint(controller *Controller) {
-       ec := BuildEndpointController(api6.KubeClientSet, controller)
-       //conf.EndpointsInformer)
-       if err := ec.Run(api6.Stop); err != nil {
-               log.Errorf("failed to run EndpointController: %s", err)
-       }
-}
diff --git a/pkg/ingress/controller/endpoint.go 
b/pkg/ingress/controller/endpoint.go
index 3d8666f..c7befd0 100644
--- a/pkg/ingress/controller/endpoint.go
+++ b/pkg/ingress/controller/endpoint.go
@@ -16,201 +16,184 @@ package controller
 
 import (
        "context"
+       "errors"
        "fmt"
-       "strconv"
-       "time"
-
-       "github.com/golang/glog"
-       CoreV1 "k8s.io/api/core/v1"
-       "k8s.io/apimachinery/pkg/api/errors"
-       "k8s.io/apimachinery/pkg/util/runtime"
-       "k8s.io/apimachinery/pkg/util/wait"
-       "k8s.io/client-go/kubernetes"
-       CoreListerV1 "k8s.io/client-go/listers/core/v1"
+
+       "go.uber.org/zap"
+       corev1 "k8s.io/api/core/v1"
+       k8serrors "k8s.io/apimachinery/pkg/api/errors"
+       "k8s.io/client-go/informers"
+       listerscorev1 "k8s.io/client-go/listers/core/v1"
        "k8s.io/client-go/tools/cache"
        "k8s.io/client-go/util/workqueue"
 
-       "github.com/api7/ingress-controller/pkg/kube"
+       "github.com/api7/ingress-controller/pkg/apisix"
+       apisixcache "github.com/api7/ingress-controller/pkg/apisix/cache"
        "github.com/api7/ingress-controller/pkg/log"
-       sevenConf "github.com/api7/ingress-controller/pkg/seven/conf"
        "github.com/api7/ingress-controller/pkg/seven/state"
        apisixv1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
 )
 
-type EndpointController struct {
-       controller     *Controller
-       kubeclientset  kubernetes.Interface
-       endpointList   CoreListerV1.EndpointsLister
-       endpointSynced cache.InformerSynced
-       workqueue      workqueue.RateLimitingInterface
+const (
+       _defaultNodeWeight = 100
+)
+
+type endpointsController struct {
+       controller *Controller
+       informer   cache.SharedIndexInformer
+       lister     listerscorev1.EndpointsLister
+       workqueue  workqueue.RateLimitingInterface
 }
 
-func BuildEndpointController(kubeclientset kubernetes.Interface, root 
*Controller) *EndpointController {
-       controller := &EndpointController{
-               controller:     root,
-               kubeclientset:  kubeclientset,
-               endpointList:   kube.EndpointsInformer.Lister(),
-               endpointSynced: kube.EndpointsInformer.Informer().HasSynced,
-               workqueue:      
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), 
"endpoints"),
+func (c *Controller) newEndpointsController(factory 
informers.SharedInformerFactory) *endpointsController {
+       ctl := &endpointsController{
+               controller: c,
+               informer:   factory.Core().V1().Endpoints().Informer(),
+               lister:     factory.Core().V1().Endpoints().Lister(),
+               workqueue:  
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), 
"endpoints"),
        }
-       kube.EndpointsInformer.Informer().AddEventHandler(
+
+       ctl.informer.AddEventHandler(
                cache.ResourceEventHandlerFuncs{
-                       AddFunc:    controller.addFunc,
-                       UpdateFunc: controller.updateFunc,
-                       DeleteFunc: controller.deleteFunc,
-               })
-       return controller
-}
+                       AddFunc:    ctl.onAdd,
+                       UpdateFunc: ctl.onUpdate,
+                       DeleteFunc: ctl.onDelete,
+               },
+       )
 
-func (c *EndpointController) Run(stop <-chan struct{}) error {
-       // 同步缓存
-       if ok := cache.WaitForCacheSync(stop); !ok {
-               log.Errorf("同步Endpoint缓存失败")
-               return fmt.Errorf("failed to wait for caches to sync")
-       }
-       go wait.Until(c.runWorker, time.Second, stop)
-       return nil
+       return ctl
 }
 
-func (c *EndpointController) runWorker() {
-       for c.processNextWorkItem() {
-       }
-}
+func (c *endpointsController) run(ctx context.Context) error {
+       log.Info("endpoints controller started")
+       defer log.Info("endpoints controller exited")
 
-func (c *EndpointController) processNextWorkItem() bool {
-       defer recoverException()
-       obj, shutdown := c.workqueue.Get()
-       if shutdown {
-               log.Info("shutdown")
-               return false
+       go func() {
+               c.informer.Run(ctx.Done())
+       }()
+
+       if ok := cache.WaitForCacheSync(ctx.Done(), c.informer.HasSynced); !ok {
+               return errors.New("endpoints informers cache sync failed")
        }
-       err := func(obj interface{}) error {
-               defer c.workqueue.Done(obj)
-               var key string
-               var ok bool
 
-               if key, ok = obj.(string); !ok {
-                       c.workqueue.Forget(obj)
-                       return fmt.Errorf("expected string in workqueue but got 
%#v", obj)
+       for {
+               obj, shutdown := c.workqueue.Get()
+               if shutdown {
+                       return nil
                }
-               // 在syncHandler中处理业务
-               if err := c.syncHandler(key); err != nil {
-                       return fmt.Errorf("error syncing '%s': %s", key, 
err.Error())
+
+               var (
+                       err error
+               )
+
+               key, ok := obj.(string)
+               if !ok {
+                       log.Errorf("found endpoints object with unexpected type 
%T, ignore it", obj)
+                       c.workqueue.Forget(obj)
+               } else {
+                       err = c.process(ctx, key)
                }
 
-               c.workqueue.Forget(obj)
-               return nil
-       }(obj)
-       if err != nil {
-               runtime.HandleError(err)
+               c.workqueue.Done(obj)
+
+               if err != nil {
+                       log.Warnf("endpoints %s retried since %s", key, err)
+                       c.retry(obj)
+               }
        }
-       return true
 }
 
-func (c *EndpointController) syncHandler(key string) error {
+func (c *endpointsController) process(ctx context.Context, key string) error {
        namespace, name, err := cache.SplitMetaNamespaceKey(key)
-       if name == "cinfoserver" || name == "file-resync2-server" {
-               log.Infof("find endpoint %s/%s", namespace, name)
-       }
        if err != nil {
-               log.Errorf("invalid resource key: %s", key)
-               return fmt.Errorf("invalid resource key: %s", key)
+               log.Errorf("found endpoints objects with malformed 
namespace/name: %s, ignore it", err)
+               return nil
        }
 
-       endpointYaml, err := c.endpointList.Endpoints(namespace).Get(name)
+       ep, err := c.lister.Endpoints(namespace).Get(name)
        if err != nil {
-               if errors.IsNotFound(err) {
-                       log.Infof("endpoint %s is removed", key)
+               if k8serrors.IsNotFound(err) {
+                       log.Warnf("endpoints %s was removed before it can be 
processed", key)
                        return nil
                }
-               runtime.HandleError(fmt.Errorf("failed to list endpoint %s/%s", 
key, err.Error()))
+               log.Errorf("failed to get endpoints %s: %s", key, err)
                return err
        }
-       // endpoint sync
-       c.process(endpointYaml)
-       return err
+       return c.sync(ctx, ep)
 }
 
-func (c *EndpointController) process(ep *CoreV1.Endpoints) {
-       if ep.Namespace != "kube-system" { // todo here is some ignore 
namespaces
-               for _, s := range ep.Subsets {
-                       // if upstream need to watch
-                       // ips
-                       ips := make([]string, 0)
-                       for _, address := range s.Addresses {
-                               ips = append(ips, address.IP)
-                       }
-                       // ports
-                       for _, port := range s.Ports {
-                               upstreamName := ep.Namespace + "_" + ep.Name + 
"_" + strconv.Itoa(int(port.Port))
-                               // find upstreamName is in apisix
-                               // default
-                               syncWithGroup("", upstreamName, ips, port)
-                               // sync with all apisix group
-                               for g := range sevenConf.UrlGroup {
-                                       syncWithGroup(g, upstreamName, ips, 
port)
-                                       //upstreams, err :=  
apisix.ListUpstream(k)
-                                       //if err == nil {
-                                       //      for _, upstream := range 
upstreams {
-                                       //              if *(upstream.Name) == 
upstreamName {
-                                       //                      nodes := 
make([]*apisixv1.Node, 0)
-                                       //                      for _, ip := 
range ips {
-                                       //                              
ipAddress := ip
-                                       //                              p := 
int(port.Port)
-                                       //                              weight 
:= 100
-                                       //                              node := 
&apisixv1.Node{IP: &ipAddress, Port: &p, Weight: &weight}
-                                       //                              nodes = 
append(nodes, node)
-                                       //                      }
-                                       //                      upstream.Nodes 
= nodes
-                                       //                      // update 
upstream nodes
-                                       //                      // add to seven 
solver queue
-                                       //                      
//apisix.UpdateUpstream(upstream)
-                                       //                      fromKind := 
WatchFromKind
-                                       //                      
upstream.FromKind = &fromKind
-                                       //                      upstreams := 
[]*apisixv1.Upstream{upstream}
-                                       //                      comb := 
state.ApisixCombination{Routes: nil, Services: nil, Upstreams: upstreams}
-                                       //                      if _, err = 
comb.Solver(); err != nil {
-                                       //                              
glog.Errorf(err.Error())
-                                       //                      }
-                                       //              }
-                                       //      }
-                                       //}
+func (c *endpointsController) sync(ctx context.Context, ep *corev1.Endpoints) 
error {
+       clusters := c.controller.apisix.ListClusters()
+       for _, s := range ep.Subsets {
+               for _, port := range s.Ports {
+                       upstream := fmt.Sprintf("%s_%s_%d", ep.Namespace, 
ep.Name, port.Port)
+                       for _, cluster := range clusters {
+                               if err := c.syncToCluster(ctx, upstream, 
cluster, s.Addresses, int(port.Port)); err != nil {
+                                       return err
                                }
                        }
                }
        }
+       return nil
 }
 
-func syncWithGroup(group, upstreamName string, ips []string, port 
CoreV1.EndpointPort) {
-       upstreams, err := 
sevenConf.Client.Cluster(group).Upstream().List(context.TODO())
-       if err == nil {
-               for _, upstream := range upstreams {
-                       if upstream.Name == upstreamName {
-                               nodes := make([]apisixv1.Node, 0)
-                               for _, ip := range ips {
-                                       node := apisixv1.Node{IP: ip, Port: 
int(port.Port), Weight: 100}
-                                       nodes = append(nodes, node)
-                               }
-                               upstream.Nodes = nodes
-                               // update upstream nodes
-                               // add to seven solver queue
-                               //apisix.UpdateUpstream(upstream)
-                               upstream.FromKind = WatchFromKind
-                               upstreams := []*apisixv1.Upstream{upstream}
-                               comb := state.ApisixCombination{Routes: nil, 
Services: nil, Upstreams: upstreams}
-                               if _, err = comb.Solver(); err != nil {
-                                       glog.Errorf(err.Error())
-                               }
-                       }
+func (c *endpointsController) syncToCluster(ctx context.Context, upstreamName 
string,
+       cluster apisix.Cluster, addresses []corev1.EndpointAddress, port int) 
error {
+       upstream, err := cluster.Upstream().Get(ctx, upstreamName)
+       if err != nil {
+               if err == apisixcache.ErrNotFound {
+                       log.Warnw("upstream is not referenced",
+                               zap.String("cluster", cluster.String()),
+                               zap.String("upstream", upstreamName),
+                       )
+                       return nil
+               } else {
+                       log.Errorw("failed to get upstream",
+                               zap.String("upstream", upstreamName),
+                               zap.String("cluster", cluster.String()),
+                               zap.Error(err),
+                       )
+                       return err
                }
        }
+
+       nodes := make([]apisixv1.Node, 0, len(addresses))
+       for _, address := range addresses {
+               nodes = append(nodes, apisixv1.Node{
+                       IP:     address.IP,
+                       Port:   port,
+                       Weight: _defaultNodeWeight,
+               })
+       }
+       log.Debugw("upstream binds new nodes",
+               zap.String("upstream", upstreamName),
+               zap.Any("nodes", nodes),
+       )
+
+       upstream.Nodes = nodes
+       upstream.FromKind = WatchFromKind
+       upstreams := []*apisixv1.Upstream{upstream}
+       comb := state.ApisixCombination{Routes: nil, Services: nil, Upstreams: 
upstreams}
+
+       if _, err = comb.Solver(); err != nil {
+               log.Errorw("failed to sync upstream",
+                       zap.String("upstream", upstreamName),
+                       zap.String("cluster", cluster.String()),
+                       zap.Error(err),
+               )
+               return err
+       }
+       return nil
 }
 
-func (c *EndpointController) addFunc(obj interface{}) {
-       var key string
-       var err error
-       if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
-               runtime.HandleError(err)
+func (c *endpointsController) retry(obj interface{}) {
+       c.workqueue.AddRateLimited(obj)
+}
+
+func (c *endpointsController) onAdd(obj interface{}) {
+       key, err := cache.MetaNamespaceKeyFunc(obj)
+       if err != nil {
+               log.Errorf("found endpoints object with bad namespace/name: %s, 
ignore it", err)
                return
        }
        if !c.controller.namespaceWatching(key) {
@@ -219,21 +202,20 @@ func (c *EndpointController) addFunc(obj interface{}) {
        c.workqueue.AddRateLimited(key)
 }
 
-func (c *EndpointController) updateFunc(oldObj, newObj interface{}) {
-       oldRoute := oldObj.(*CoreV1.Endpoints)
-       newRoute := newObj.(*CoreV1.Endpoints)
-       if oldRoute.ResourceVersion == newRoute.ResourceVersion {
+func (c *endpointsController) onUpdate(prev, curr interface{}) {
+       prevEp := prev.(*corev1.Endpoints)
+       currEp := curr.(*corev1.Endpoints)
+
+       if prevEp.GetResourceVersion() == currEp.GetResourceVersion() {
                return
        }
-       c.addFunc(newObj)
+       c.onAdd(currEp)
 }
 
-func (c *EndpointController) deleteFunc(obj interface{}) {
-       var key string
-       var err error
-       key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
+func (c *endpointsController) onDelete(obj interface{}) {
+       key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
        if err != nil {
-               runtime.HandleError(err)
+               log.Errorf("failed to find the final state before deletion: 
%s", err)
                return
        }
        if !c.controller.namespaceWatching(key) {
diff --git a/pkg/ingress/controller/watch.go b/pkg/ingress/controller/watch.go
index e2b4c76..d857ef6 100644
--- a/pkg/ingress/controller/watch.go
+++ b/pkg/ingress/controller/watch.go
@@ -14,112 +14,9 @@
 // limitations under the License.
 package controller
 
-import (
-       "context"
-       "strconv"
-
-       "github.com/golang/glog"
-       v1 "k8s.io/api/core/v1"
-
-       "github.com/api7/ingress-controller/pkg/kube"
-       sevenConf "github.com/api7/ingress-controller/pkg/seven/conf"
-       "github.com/api7/ingress-controller/pkg/seven/state"
-       apisixv1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
-)
-
 const (
        ADD           = "ADD"
        UPDATE        = "UPDATE"
        DELETE        = "DELETE"
        WatchFromKind = "watch"
 )
-
-func Watch() {
-       c := &controller{
-               queue: make(chan interface{}, 100),
-       }
-       kube.EndpointsInformer.Informer().AddEventHandler(&QueueEventHandler{c: 
c})
-       go c.run()
-}
-
-func (c *controller) pop() interface{} {
-       e := <-c.queue
-       return e
-}
-
-func (c *controller) run() {
-       for {
-               ele := c.pop()
-               c.process(ele)
-       }
-}
-
-func (c *controller) process(obj interface{}) {
-       qo, _ := obj.(*queueObj)
-       ep, _ := qo.Obj.(*v1.Endpoints)
-       if ep.Namespace != "kube-system" { // todo here is some ignore 
namespaces
-               for _, s := range ep.Subsets {
-                       // if upstream need to watch
-                       // ips
-                       ips := make([]string, 0)
-                       for _, address := range s.Addresses {
-                               ips = append(ips, address.IP)
-                       }
-                       // ports
-                       for _, port := range s.Ports {
-                               upstreamName := ep.Namespace + "_" + ep.Name + 
"_" + strconv.Itoa(int(port.Port))
-                               // find upstreamName is in apisix
-                               // sync with all apisix group
-                               for _, cluster := range 
sevenConf.Client.ListClusters() {
-                                       upstreams, err := 
cluster.Upstream().List(context.TODO())
-                                       if err == nil {
-                                               for _, upstream := range 
upstreams {
-                                                       if upstream.Name == 
upstreamName {
-                                                               nodes := 
make([]apisixv1.Node, 0)
-                                                               for _, ip := 
range ips {
-                                                                       node := 
apisixv1.Node{IP: ip, Port: int(port.Port), Weight: 100}
-                                                                       nodes = 
append(nodes, node)
-                                                               }
-                                                               upstream.Nodes 
= nodes
-                                                               // update 
upstream nodes
-                                                               // add to seven 
solver queue
-                                                               
//apisix.UpdateUpstream(upstream)
-                                                               
upstream.FromKind = WatchFromKind
-                                                               upstreams := 
[]*apisixv1.Upstream{upstream}
-                                                               comb := 
state.ApisixCombination{Routes: nil, Services: nil, Upstreams: upstreams}
-                                                               if _, err = 
comb.Solver(); err != nil {
-                                                                       
glog.Errorf(err.Error())
-                                                               }
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-               }
-       }
-}
-
-type controller struct {
-       queue chan interface{}
-}
-
-type queueObj struct {
-       OpeType string      `json:"ope_type"`
-       Obj     interface{} `json:"obj"`
-}
-
-type QueueEventHandler struct {
-       c *controller
-}
-
-func (h *QueueEventHandler) OnAdd(obj interface{}) {
-       h.c.queue <- &queueObj{ADD, obj}
-}
-
-func (h *QueueEventHandler) OnDelete(obj interface{}) {
-       h.c.queue <- &queueObj{DELETE, obj}
-}
-
-func (h *QueueEventHandler) OnUpdate(old, update interface{}) {
-       h.c.queue <- &queueObj{UPDATE, update}
-}
diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go
index 54a408e..bb77051 100644
--- a/test/e2e/e2e.go
+++ b/test/e2e/e2e.go
@@ -20,6 +20,7 @@ import (
        "github.com/gruntwork-io/terratest/modules/k8s"
        "github.com/onsi/ginkgo"
 
+       _ "github.com/api7/ingress-controller/test/e2e/endpoints"
        _ "github.com/api7/ingress-controller/test/e2e/ingress"
        "github.com/api7/ingress-controller/test/e2e/scaffold"
 )
diff --git a/test/e2e/endpoints/endpoints.go b/test/e2e/endpoints/endpoints.go
new file mode 100644
index 0000000..39921df
--- /dev/null
+++ b/test/e2e/endpoints/endpoints.go
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package endpoints
+
+import (
+       "fmt"
+       "time"
+
+       "github.com/api7/ingress-controller/test/e2e/scaffold"
+       "github.com/onsi/ginkgo"
+       "github.com/stretchr/testify/assert"
+)
+
+var _ = ginkgo.Describe("endpoints", func() {
+       s := scaffold.NewDefaultScaffold()
+       ginkgo.It("ignore applied only if there is an ApisixUpstream 
referenced", func() {
+               time.Sleep(5 * time.Second)
+               assert.Nil(ginkgo.GinkgoT(), 
s.EnsureNumApisixUpstreamsCreated(0), "checking number of upstreams")
+               backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+               ups := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixUpstream
+metadata:
+  name: %s
+spec:
+  ports:
+    - port: %d
+      loadbalancer:
+        type: roundbin
+`, backendSvc, backendSvcPort[0])
+               assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(ups))
+               assert.Nil(ginkgo.GinkgoT(), 
s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams")
+       })
+})
diff --git a/test/e2e/scaffold/apisix.go b/test/e2e/scaffold/apisix.go
index 37c504b..7915af2 100644
--- a/test/e2e/scaffold/apisix.go
+++ b/test/e2e/scaffold/apisix.go
@@ -209,14 +209,19 @@ func (s *Scaffold) waitAllAPISIXPodsAvailable() error {
                        return false, nil
                }
                for _, item := range items {
+                       foundPodReady := false
                        for _, cond := range item.Status.Conditions {
                                if cond.Type != corev1.PodReady {
                                        continue
                                }
+                               foundPodReady = true
                                if cond.Status != "True" {
                                        return false, nil
                                }
                        }
+                       if !foundPodReady {
+                               return false, nil
+                       }
                }
                return true, nil
        }
diff --git a/test/e2e/scaffold/etcd.go b/test/e2e/scaffold/etcd.go
index 6d9bdc9..de227e1 100644
--- a/test/e2e/scaffold/etcd.go
+++ b/test/e2e/scaffold/etcd.go
@@ -120,14 +120,19 @@ func (s *Scaffold) waitAllEtcdPodsAvailable() error {
                        return false, nil
                }
                for _, item := range items {
+                       foundPodReady := false
                        for _, cond := range item.Status.Conditions {
                                if cond.Type != corev1.PodReady {
                                        continue
                                }
+                               foundPodReady = true
                                if cond.Status != "True" {
                                        return false, nil
                                }
                        }
+                       if !foundPodReady {
+                               return false, nil
+                       }
                }
                return true, nil
        }
diff --git a/test/e2e/scaffold/httpbin.go b/test/e2e/scaffold/httpbin.go
index f9e1f87..eab3a1e 100644
--- a/test/e2e/scaffold/httpbin.go
+++ b/test/e2e/scaffold/httpbin.go
@@ -131,14 +131,19 @@ func (s *Scaffold) WaitAllHTTPBINPoddsAvailable() error {
                        return false, nil
                }
                for _, item := range items {
+                       foundPodReady := false
                        for _, cond := range item.Status.Conditions {
                                if cond.Type != corev1.PodReady {
                                        continue
                                }
+                               foundPodReady = true
                                if cond.Status != "True" {
                                        return false, nil
                                }
                        }
+                       if !foundPodReady {
+                               return false, nil
+                       }
                }
                return true, nil
        }
diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go
index 6076eb9..546e1f2 100644
--- a/test/e2e/scaffold/ingress.go
+++ b/test/e2e/scaffold/ingress.go
@@ -144,14 +144,19 @@ func (s *Scaffold) 
waitAllIngressControllerPodsAvailable() error {
                        return false, nil
                }
                for _, item := range items {
+                       foundPodReady := false
                        for _, cond := range item.Status.Conditions {
                                if cond.Type != corev1.PodReady {
                                        continue
                                }
+                               foundPodReady = true
                                if cond.Status != "True" {
                                        return false, nil
                                }
                        }
+                       if !foundPodReady {
+                               return false, nil
+                       }
                }
                return true, nil
        }

Reply via email to