This is an automated email from the ASF dual-hosted git repository.
tokers pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix-ingress-controller.git
The following commit(s) were added to refs/heads/master by this push:
new ea3543b optimize endpoints controller (#206)
ea3543b is described below
commit ea3543b7179b06a9fd6c4bf0e36286c7763a9129
Author: Alex Zhang <[email protected]>
AuthorDate: Tue Jan 26 14:19:18 2021 +0800
optimize endpoints controller (#206)
---
pkg/ingress/controller/controller.go | 25 ++-
pkg/ingress/controller/endpoint.go | 300 ++++++++++++++++-------------------
pkg/ingress/controller/watch.go | 103 ------------
test/e2e/e2e.go | 1 +
test/e2e/endpoints/endpoints.go | 46 ++++++
test/e2e/scaffold/apisix.go | 5 +
test/e2e/scaffold/etcd.go | 5 +
test/e2e/scaffold/httpbin.go | 5 +
test/e2e/scaffold/ingress.go | 5 +
9 files changed, 218 insertions(+), 277 deletions(-)
diff --git a/pkg/ingress/controller/controller.go
b/pkg/ingress/controller/controller.go
index 2669638..4b86a60 100644
--- a/pkg/ingress/controller/controller.go
+++ b/pkg/ingress/controller/controller.go
@@ -63,6 +63,8 @@ type Controller struct {
metricsCollector metrics.Collector
crdController *Api6Controller
crdInformerFactory externalversions.SharedInformerFactory
+
+ endpointsController *endpointsController
}
// NewController creates an ingress apisix controller object.
@@ -97,6 +99,7 @@ func NewController(cfg *config.Config) (*Controller, error) {
watchingNamespace[ns] = struct{}{}
}
}
+ kube.EndpointsInformer =
kube.CoreSharedInformerFactory.Core().V1().Endpoints()
c := &Controller{
name: podName,
@@ -111,6 +114,7 @@ func NewController(cfg *config.Config) (*Controller, error)
{
watchingNamespace: watchingNamespace,
}
+ c.endpointsController =
c.newEndpointsController(kube.CoreSharedInformerFactory)
return c, nil
}
@@ -221,6 +225,12 @@ func (c *Controller) run(ctx context.Context) {
return
}
+ c.goAttach(func() {
+ if err := c.endpointsController.run(ctx); err != nil {
+ log.Errorf("failed to run endpoints controller: %s",
err.Error())
+ }
+ })
+
ac := &Api6Controller{
KubeClientSet: c.clientset,
Api6ClientSet: c.crdClientset,
@@ -228,13 +238,6 @@ func (c *Controller) run(ctx context.Context) {
CoreSharedInformerFactory: kube.CoreSharedInformerFactory,
Stop: ctx.Done(),
}
- epInformer := ac.CoreSharedInformerFactory.Core().V1().Endpoints()
- kube.EndpointsInformer = epInformer
- // endpoint
- ac.Endpoint(c)
- c.goAttach(func() {
- ac.CoreSharedInformerFactory.Start(ctx.Done())
- })
// ApisixRoute
ac.ApisixRoute(c)
@@ -322,11 +325,3 @@ func (api6 *Api6Controller) ApisixTLS(controller
*Controller) {
log.Errorf("failed to run ApisixTlsController: %s", err)
}
}
-
-func (api6 *Api6Controller) Endpoint(controller *Controller) {
- ec := BuildEndpointController(api6.KubeClientSet, controller)
- //conf.EndpointsInformer)
- if err := ec.Run(api6.Stop); err != nil {
- log.Errorf("failed to run EndpointController: %s", err)
- }
-}
diff --git a/pkg/ingress/controller/endpoint.go
b/pkg/ingress/controller/endpoint.go
index 3d8666f..c7befd0 100644
--- a/pkg/ingress/controller/endpoint.go
+++ b/pkg/ingress/controller/endpoint.go
@@ -16,201 +16,184 @@ package controller
import (
"context"
+ "errors"
"fmt"
- "strconv"
- "time"
-
- "github.com/golang/glog"
- CoreV1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/api/errors"
- "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- "k8s.io/client-go/kubernetes"
- CoreListerV1 "k8s.io/client-go/listers/core/v1"
+
+ "go.uber.org/zap"
+ corev1 "k8s.io/api/core/v1"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ "k8s.io/client-go/informers"
+ listerscorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
- "github.com/api7/ingress-controller/pkg/kube"
+ "github.com/api7/ingress-controller/pkg/apisix"
+ apisixcache "github.com/api7/ingress-controller/pkg/apisix/cache"
"github.com/api7/ingress-controller/pkg/log"
- sevenConf "github.com/api7/ingress-controller/pkg/seven/conf"
"github.com/api7/ingress-controller/pkg/seven/state"
apisixv1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
)
-type EndpointController struct {
- controller *Controller
- kubeclientset kubernetes.Interface
- endpointList CoreListerV1.EndpointsLister
- endpointSynced cache.InformerSynced
- workqueue workqueue.RateLimitingInterface
+const (
+ _defaultNodeWeight = 100
+)
+
+type endpointsController struct {
+ controller *Controller
+ informer cache.SharedIndexInformer
+ lister listerscorev1.EndpointsLister
+ workqueue workqueue.RateLimitingInterface
}
-func BuildEndpointController(kubeclientset kubernetes.Interface, root
*Controller) *EndpointController {
- controller := &EndpointController{
- controller: root,
- kubeclientset: kubeclientset,
- endpointList: kube.EndpointsInformer.Lister(),
- endpointSynced: kube.EndpointsInformer.Informer().HasSynced,
- workqueue:
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
"endpoints"),
+func (c *Controller) newEndpointsController(factory
informers.SharedInformerFactory) *endpointsController {
+ ctl := &endpointsController{
+ controller: c,
+ informer: factory.Core().V1().Endpoints().Informer(),
+ lister: factory.Core().V1().Endpoints().Lister(),
+ workqueue:
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
"endpoints"),
}
- kube.EndpointsInformer.Informer().AddEventHandler(
+
+ ctl.informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
- AddFunc: controller.addFunc,
- UpdateFunc: controller.updateFunc,
- DeleteFunc: controller.deleteFunc,
- })
- return controller
-}
+ AddFunc: ctl.onAdd,
+ UpdateFunc: ctl.onUpdate,
+ DeleteFunc: ctl.onDelete,
+ },
+ )
-func (c *EndpointController) Run(stop <-chan struct{}) error {
- // 同步缓存
- if ok := cache.WaitForCacheSync(stop); !ok {
- log.Errorf("同步Endpoint缓存失败")
- return fmt.Errorf("failed to wait for caches to sync")
- }
- go wait.Until(c.runWorker, time.Second, stop)
- return nil
+ return ctl
}
-func (c *EndpointController) runWorker() {
- for c.processNextWorkItem() {
- }
-}
+func (c *endpointsController) run(ctx context.Context) error {
+ log.Info("endpoints controller started")
+ defer log.Info("endpoints controller exited")
-func (c *EndpointController) processNextWorkItem() bool {
- defer recoverException()
- obj, shutdown := c.workqueue.Get()
- if shutdown {
- log.Info("shutdown")
- return false
+ go func() {
+ c.informer.Run(ctx.Done())
+ }()
+
+ if ok := cache.WaitForCacheSync(ctx.Done(), c.informer.HasSynced); !ok {
+ return errors.New("endpoints informers cache sync failed")
}
- err := func(obj interface{}) error {
- defer c.workqueue.Done(obj)
- var key string
- var ok bool
- if key, ok = obj.(string); !ok {
- c.workqueue.Forget(obj)
- return fmt.Errorf("expected string in workqueue but got
%#v", obj)
+ for {
+ obj, shutdown := c.workqueue.Get()
+ if shutdown {
+ return nil
}
- // 在syncHandler中处理业务
- if err := c.syncHandler(key); err != nil {
- return fmt.Errorf("error syncing '%s': %s", key,
err.Error())
+
+ var (
+ err error
+ )
+
+ key, ok := obj.(string)
+ if !ok {
+ log.Errorf("found endpoints object with unexpected type
%T, ignore it", obj)
+ c.workqueue.Forget(obj)
+ } else {
+ err = c.process(ctx, key)
}
- c.workqueue.Forget(obj)
- return nil
- }(obj)
- if err != nil {
- runtime.HandleError(err)
+ c.workqueue.Done(obj)
+
+ if err != nil {
+ log.Warnf("endpoints %s retried since %s", key, err)
+ c.retry(obj)
+ }
}
- return true
}
-func (c *EndpointController) syncHandler(key string) error {
+func (c *endpointsController) process(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
- if name == "cinfoserver" || name == "file-resync2-server" {
- log.Infof("find endpoint %s/%s", namespace, name)
- }
if err != nil {
- log.Errorf("invalid resource key: %s", key)
- return fmt.Errorf("invalid resource key: %s", key)
+ log.Errorf("found endpoints objects with malformed
namespace/name: %s, ignore it", err)
+ return nil
}
- endpointYaml, err := c.endpointList.Endpoints(namespace).Get(name)
+ ep, err := c.lister.Endpoints(namespace).Get(name)
if err != nil {
- if errors.IsNotFound(err) {
- log.Infof("endpoint %s is removed", key)
+ if k8serrors.IsNotFound(err) {
+ log.Warnf("endpoints %s was removed before it can be
processed", key)
return nil
}
- runtime.HandleError(fmt.Errorf("failed to list endpoint %s/%s",
key, err.Error()))
+ log.Errorf("failed to get endpoints %s: %s", key, err)
return err
}
- // endpoint sync
- c.process(endpointYaml)
- return err
+ return c.sync(ctx, ep)
}
-func (c *EndpointController) process(ep *CoreV1.Endpoints) {
- if ep.Namespace != "kube-system" { // todo here is some ignore
namespaces
- for _, s := range ep.Subsets {
- // if upstream need to watch
- // ips
- ips := make([]string, 0)
- for _, address := range s.Addresses {
- ips = append(ips, address.IP)
- }
- // ports
- for _, port := range s.Ports {
- upstreamName := ep.Namespace + "_" + ep.Name +
"_" + strconv.Itoa(int(port.Port))
- // find upstreamName is in apisix
- // default
- syncWithGroup("", upstreamName, ips, port)
- // sync with all apisix group
- for g := range sevenConf.UrlGroup {
- syncWithGroup(g, upstreamName, ips,
port)
- //upstreams, err :=
apisix.ListUpstream(k)
- //if err == nil {
- // for _, upstream := range
upstreams {
- // if *(upstream.Name) ==
upstreamName {
- // nodes :=
make([]*apisixv1.Node, 0)
- // for _, ip :=
range ips {
- //
ipAddress := ip
- // p :=
int(port.Port)
- // weight
:= 100
- // node :=
&apisixv1.Node{IP: &ipAddress, Port: &p, Weight: &weight}
- // nodes =
append(nodes, node)
- // }
- // upstream.Nodes
= nodes
- // // update
upstream nodes
- // // add to seven
solver queue
- //
//apisix.UpdateUpstream(upstream)
- // fromKind :=
WatchFromKind
- //
upstream.FromKind = &fromKind
- // upstreams :=
[]*apisixv1.Upstream{upstream}
- // comb :=
state.ApisixCombination{Routes: nil, Services: nil, Upstreams: upstreams}
- // if _, err =
comb.Solver(); err != nil {
- //
glog.Errorf(err.Error())
- // }
- // }
- // }
- //}
+func (c *endpointsController) sync(ctx context.Context, ep *corev1.Endpoints)
error {
+ clusters := c.controller.apisix.ListClusters()
+ for _, s := range ep.Subsets {
+ for _, port := range s.Ports {
+ upstream := fmt.Sprintf("%s_%s_%d", ep.Namespace,
ep.Name, port.Port)
+ for _, cluster := range clusters {
+ if err := c.syncToCluster(ctx, upstream,
cluster, s.Addresses, int(port.Port)); err != nil {
+ return err
}
}
}
}
+ return nil
}
-func syncWithGroup(group, upstreamName string, ips []string, port
CoreV1.EndpointPort) {
- upstreams, err :=
sevenConf.Client.Cluster(group).Upstream().List(context.TODO())
- if err == nil {
- for _, upstream := range upstreams {
- if upstream.Name == upstreamName {
- nodes := make([]apisixv1.Node, 0)
- for _, ip := range ips {
- node := apisixv1.Node{IP: ip, Port:
int(port.Port), Weight: 100}
- nodes = append(nodes, node)
- }
- upstream.Nodes = nodes
- // update upstream nodes
- // add to seven solver queue
- //apisix.UpdateUpstream(upstream)
- upstream.FromKind = WatchFromKind
- upstreams := []*apisixv1.Upstream{upstream}
- comb := state.ApisixCombination{Routes: nil,
Services: nil, Upstreams: upstreams}
- if _, err = comb.Solver(); err != nil {
- glog.Errorf(err.Error())
- }
- }
+func (c *endpointsController) syncToCluster(ctx context.Context, upstreamName
string,
+ cluster apisix.Cluster, addresses []corev1.EndpointAddress, port int)
error {
+ upstream, err := cluster.Upstream().Get(ctx, upstreamName)
+ if err != nil {
+ if err == apisixcache.ErrNotFound {
+ log.Warnw("upstream is not referenced",
+ zap.String("cluster", cluster.String()),
+ zap.String("upstream", upstreamName),
+ )
+ return nil
+ } else {
+ log.Errorw("failed to get upstream",
+ zap.String("upstream", upstreamName),
+ zap.String("cluster", cluster.String()),
+ zap.Error(err),
+ )
+ return err
}
}
+
+ nodes := make([]apisixv1.Node, 0, len(addresses))
+ for _, address := range addresses {
+ nodes = append(nodes, apisixv1.Node{
+ IP: address.IP,
+ Port: port,
+ Weight: _defaultNodeWeight,
+ })
+ }
+ log.Debugw("upstream binds new nodes",
+ zap.String("upstream", upstreamName),
+ zap.Any("nodes", nodes),
+ )
+
+ upstream.Nodes = nodes
+ upstream.FromKind = WatchFromKind
+ upstreams := []*apisixv1.Upstream{upstream}
+ comb := state.ApisixCombination{Routes: nil, Services: nil, Upstreams:
upstreams}
+
+ if _, err = comb.Solver(); err != nil {
+ log.Errorw("failed to sync upstream",
+ zap.String("upstream", upstreamName),
+ zap.String("cluster", cluster.String()),
+ zap.Error(err),
+ )
+ return err
+ }
+ return nil
}
-func (c *EndpointController) addFunc(obj interface{}) {
- var key string
- var err error
- if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
- runtime.HandleError(err)
+func (c *endpointsController) retry(obj interface{}) {
+ c.workqueue.AddRateLimited(obj)
+}
+
+func (c *endpointsController) onAdd(obj interface{}) {
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err != nil {
+ log.Errorf("found endpoints object with bad namespace/name: %s,
ignore it", err)
return
}
if !c.controller.namespaceWatching(key) {
@@ -219,21 +202,20 @@ func (c *EndpointController) addFunc(obj interface{}) {
c.workqueue.AddRateLimited(key)
}
-func (c *EndpointController) updateFunc(oldObj, newObj interface{}) {
- oldRoute := oldObj.(*CoreV1.Endpoints)
- newRoute := newObj.(*CoreV1.Endpoints)
- if oldRoute.ResourceVersion == newRoute.ResourceVersion {
+func (c *endpointsController) onUpdate(prev, curr interface{}) {
+ prevEp := prev.(*corev1.Endpoints)
+ currEp := curr.(*corev1.Endpoints)
+
+ if prevEp.GetResourceVersion() == currEp.GetResourceVersion() {
return
}
- c.addFunc(newObj)
+ c.onAdd(currEp)
}
-func (c *EndpointController) deleteFunc(obj interface{}) {
- var key string
- var err error
- key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
+func (c *endpointsController) onDelete(obj interface{}) {
+ key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
- runtime.HandleError(err)
+ log.Errorf("failed to find the final state before deletion:
%s", err)
return
}
if !c.controller.namespaceWatching(key) {
diff --git a/pkg/ingress/controller/watch.go b/pkg/ingress/controller/watch.go
index e2b4c76..d857ef6 100644
--- a/pkg/ingress/controller/watch.go
+++ b/pkg/ingress/controller/watch.go
@@ -14,112 +14,9 @@
// limitations under the License.
package controller
-import (
- "context"
- "strconv"
-
- "github.com/golang/glog"
- v1 "k8s.io/api/core/v1"
-
- "github.com/api7/ingress-controller/pkg/kube"
- sevenConf "github.com/api7/ingress-controller/pkg/seven/conf"
- "github.com/api7/ingress-controller/pkg/seven/state"
- apisixv1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
-)
-
const (
ADD = "ADD"
UPDATE = "UPDATE"
DELETE = "DELETE"
WatchFromKind = "watch"
)
-
-func Watch() {
- c := &controller{
- queue: make(chan interface{}, 100),
- }
- kube.EndpointsInformer.Informer().AddEventHandler(&QueueEventHandler{c:
c})
- go c.run()
-}
-
-func (c *controller) pop() interface{} {
- e := <-c.queue
- return e
-}
-
-func (c *controller) run() {
- for {
- ele := c.pop()
- c.process(ele)
- }
-}
-
-func (c *controller) process(obj interface{}) {
- qo, _ := obj.(*queueObj)
- ep, _ := qo.Obj.(*v1.Endpoints)
- if ep.Namespace != "kube-system" { // todo here is some ignore
namespaces
- for _, s := range ep.Subsets {
- // if upstream need to watch
- // ips
- ips := make([]string, 0)
- for _, address := range s.Addresses {
- ips = append(ips, address.IP)
- }
- // ports
- for _, port := range s.Ports {
- upstreamName := ep.Namespace + "_" + ep.Name +
"_" + strconv.Itoa(int(port.Port))
- // find upstreamName is in apisix
- // sync with all apisix group
- for _, cluster := range
sevenConf.Client.ListClusters() {
- upstreams, err :=
cluster.Upstream().List(context.TODO())
- if err == nil {
- for _, upstream := range
upstreams {
- if upstream.Name ==
upstreamName {
- nodes :=
make([]apisixv1.Node, 0)
- for _, ip :=
range ips {
- node :=
apisixv1.Node{IP: ip, Port: int(port.Port), Weight: 100}
- nodes =
append(nodes, node)
- }
- upstream.Nodes
= nodes
- // update
upstream nodes
- // add to seven
solver queue
-
//apisix.UpdateUpstream(upstream)
-
upstream.FromKind = WatchFromKind
- upstreams :=
[]*apisixv1.Upstream{upstream}
- comb :=
state.ApisixCombination{Routes: nil, Services: nil, Upstreams: upstreams}
- if _, err =
comb.Solver(); err != nil {
-
glog.Errorf(err.Error())
- }
- }
- }
- }
- }
- }
- }
- }
-}
-
-type controller struct {
- queue chan interface{}
-}
-
-type queueObj struct {
- OpeType string `json:"ope_type"`
- Obj interface{} `json:"obj"`
-}
-
-type QueueEventHandler struct {
- c *controller
-}
-
-func (h *QueueEventHandler) OnAdd(obj interface{}) {
- h.c.queue <- &queueObj{ADD, obj}
-}
-
-func (h *QueueEventHandler) OnDelete(obj interface{}) {
- h.c.queue <- &queueObj{DELETE, obj}
-}
-
-func (h *QueueEventHandler) OnUpdate(old, update interface{}) {
- h.c.queue <- &queueObj{UPDATE, update}
-}
diff --git a/test/e2e/e2e.go b/test/e2e/e2e.go
index 54a408e..bb77051 100644
--- a/test/e2e/e2e.go
+++ b/test/e2e/e2e.go
@@ -20,6 +20,7 @@ import (
"github.com/gruntwork-io/terratest/modules/k8s"
"github.com/onsi/ginkgo"
+ _ "github.com/api7/ingress-controller/test/e2e/endpoints"
_ "github.com/api7/ingress-controller/test/e2e/ingress"
"github.com/api7/ingress-controller/test/e2e/scaffold"
)
diff --git a/test/e2e/endpoints/endpoints.go b/test/e2e/endpoints/endpoints.go
new file mode 100644
index 0000000..39921df
--- /dev/null
+++ b/test/e2e/endpoints/endpoints.go
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package endpoints
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/api7/ingress-controller/test/e2e/scaffold"
+ "github.com/onsi/ginkgo"
+ "github.com/stretchr/testify/assert"
+)
+
+var _ = ginkgo.Describe("endpoints", func() {
+ s := scaffold.NewDefaultScaffold()
+ ginkgo.It("ignore applied only if there is an ApisixUpstream
referenced", func() {
+ time.Sleep(5 * time.Second)
+ assert.Nil(ginkgo.GinkgoT(),
s.EnsureNumApisixUpstreamsCreated(0), "checking number of upstreams")
+ backendSvc, backendSvcPort := s.DefaultHTTPBackend()
+ ups := fmt.Sprintf(`
+apiVersion: apisix.apache.org/v1
+kind: ApisixUpstream
+metadata:
+ name: %s
+spec:
+ ports:
+ - port: %d
+ loadbalancer:
+ type: roundbin
+`, backendSvc, backendSvcPort[0])
+ assert.Nil(ginkgo.GinkgoT(), s.CreateResourceFromString(ups))
+ assert.Nil(ginkgo.GinkgoT(),
s.EnsureNumApisixUpstreamsCreated(1), "checking number of upstreams")
+ })
+})
diff --git a/test/e2e/scaffold/apisix.go b/test/e2e/scaffold/apisix.go
index 37c504b..7915af2 100644
--- a/test/e2e/scaffold/apisix.go
+++ b/test/e2e/scaffold/apisix.go
@@ -209,14 +209,19 @@ func (s *Scaffold) waitAllAPISIXPodsAvailable() error {
return false, nil
}
for _, item := range items {
+ foundPodReady := false
for _, cond := range item.Status.Conditions {
if cond.Type != corev1.PodReady {
continue
}
+ foundPodReady = true
if cond.Status != "True" {
return false, nil
}
}
+ if !foundPodReady {
+ return false, nil
+ }
}
return true, nil
}
diff --git a/test/e2e/scaffold/etcd.go b/test/e2e/scaffold/etcd.go
index 6d9bdc9..de227e1 100644
--- a/test/e2e/scaffold/etcd.go
+++ b/test/e2e/scaffold/etcd.go
@@ -120,14 +120,19 @@ func (s *Scaffold) waitAllEtcdPodsAvailable() error {
return false, nil
}
for _, item := range items {
+ foundPodReady := false
for _, cond := range item.Status.Conditions {
if cond.Type != corev1.PodReady {
continue
}
+ foundPodReady = true
if cond.Status != "True" {
return false, nil
}
}
+ if !foundPodReady {
+ return false, nil
+ }
}
return true, nil
}
diff --git a/test/e2e/scaffold/httpbin.go b/test/e2e/scaffold/httpbin.go
index f9e1f87..eab3a1e 100644
--- a/test/e2e/scaffold/httpbin.go
+++ b/test/e2e/scaffold/httpbin.go
@@ -131,14 +131,19 @@ func (s *Scaffold) WaitAllHTTPBINPoddsAvailable() error {
return false, nil
}
for _, item := range items {
+ foundPodReady := false
for _, cond := range item.Status.Conditions {
if cond.Type != corev1.PodReady {
continue
}
+ foundPodReady = true
if cond.Status != "True" {
return false, nil
}
}
+ if !foundPodReady {
+ return false, nil
+ }
}
return true, nil
}
diff --git a/test/e2e/scaffold/ingress.go b/test/e2e/scaffold/ingress.go
index 6076eb9..546e1f2 100644
--- a/test/e2e/scaffold/ingress.go
+++ b/test/e2e/scaffold/ingress.go
@@ -144,14 +144,19 @@ func (s *Scaffold)
waitAllIngressControllerPodsAvailable() error {
return false, nil
}
for _, item := range items {
+ foundPodReady := false
for _, cond := range item.Status.Conditions {
if cond.Type != corev1.PodReady {
continue
}
+ foundPodReady = true
if cond.Status != "True" {
return false, nil
}
}
+ if !foundPodReady {
+ return false, nil
+ }
}
return true, nil
}