tokers commented on a change in pull request #149:
URL: 
https://github.com/apache/apisix-ingress-controller/pull/149#discussion_r551097615



##########
File path: pkg/seven/state/builder.go
##########
@@ -20,10 +20,12 @@ import (
 
        "github.com/golang/glog"
 
+       "context"
        "github.com/api7/ingress-controller/pkg/seven/apisix"
        "github.com/api7/ingress-controller/pkg/seven/db"
        "github.com/api7/ingress-controller/pkg/seven/utils"
        v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+       "sync"

Review comment:
       Style: put standard packages at the top of import block.

##########
File path: pkg/seven/state/builder.go
##########
@@ -84,161 +86,198 @@ func paddingUpstream(upstream *v1.Upstream, 
currentUpstream *v1.Upstream) {
 // NewRouteWorkers make routeWrokers group by service per CRD
 // 1.make routes group by (1_2_3) it may be a map like map[1_2_3][]Route;
 // 2.route is listenning Event from the ready of 1_2_3;
-func NewRouteWorkers(routes []*v1.Route) RouteWorkerGroup {
+func NewRouteWorkers(ctx context.Context, routes []*v1.Route, wg 
*sync.WaitGroup, errorChan chan CRDStatus) RouteWorkerGroup {
        rwg := make(RouteWorkerGroup)
        for _, r := range routes {
-               quit := make(chan Quit)
-               rw := &routeWorker{Route: r, Quit: quit}
+               rw := &routeWorker{Route: r, Ctx: ctx, Wg: wg, ErrorChan: 
errorChan}
                rw.start()
                rwg.Add(*r.ServiceName, rw)
        }
        return rwg
 }
 
 // 3.route get the Event and trigger a padding for object,then diff,sync;
-func (r *routeWorker) trigger(event Event) error {
-       defer close(r.Quit)
+func (r *routeWorker) trigger(event Event) {
+       var errNotify error
+       defer func(err error) {
+               if err != nil {
+                       r.ErrorChan <- CRDStatus{Id: "", Status: "failure", 
Err: err}
+               }
+               r.Wg.Done()
+       }(errNotify)

Review comment:
       Here you should use `errNotify` as a Closure variable rather than 
function parameter, since the parameters deduction of deferred function happen 
when the defer statement runs (runtime.deferproc), not the function 
runs(`runtime.deferreturn`)

##########
File path: pkg/seven/state/builder.go
##########
@@ -84,161 +86,198 @@ func paddingUpstream(upstream *v1.Upstream, 
currentUpstream *v1.Upstream) {
 // NewRouteWorkers make routeWrokers group by service per CRD
 // 1.make routes group by (1_2_3) it may be a map like map[1_2_3][]Route;
 // 2.route is listenning Event from the ready of 1_2_3;
-func NewRouteWorkers(routes []*v1.Route) RouteWorkerGroup {
+func NewRouteWorkers(ctx context.Context, routes []*v1.Route, wg 
*sync.WaitGroup, errorChan chan CRDStatus) RouteWorkerGroup {
        rwg := make(RouteWorkerGroup)
        for _, r := range routes {
-               quit := make(chan Quit)
-               rw := &routeWorker{Route: r, Quit: quit}
+               rw := &routeWorker{Route: r, Ctx: ctx, Wg: wg, ErrorChan: 
errorChan}
                rw.start()
                rwg.Add(*r.ServiceName, rw)
        }
        return rwg
 }
 
 // 3.route get the Event and trigger a padding for object,then diff,sync;
-func (r *routeWorker) trigger(event Event) error {
-       defer close(r.Quit)
+func (r *routeWorker) trigger(event Event) {
+       var errNotify error
+       defer func(err error) {
+               if err != nil {
+                       r.ErrorChan <- CRDStatus{Id: "", Status: "failure", 
Err: err}
+               }
+               r.Wg.Done()
+       }(errNotify)
        // consumer Event
        service := event.Obj.(*v1.Service)
        r.ServiceId = service.ID
        glog.V(2).Infof("trigger routeWorker %s from %s, %s", *r.Name, 
event.Op, *service.Name)
 
        // padding
-       currentRoute, _ := apisix.FindCurrentRoute(r.Route)
+       currentRoute, err := apisix.FindCurrentRoute(r.Route)
+       if err != nil && err.Error() != "NOT FOUND" {
+               errNotify = err
+               return
+       }
        paddingRoute(r.Route, currentRoute)
        // diff
        hasDiff, err := utils.HasDiff(r.Route, currentRoute)
        // sync
        if err != nil {
-               return err
+               errNotify = err
+               return
        }
        if hasDiff {
-               r.sync()
+               err := r.sync()
+               if err != nil {
+                       errNotify = err
+                       return
+               }
        }
-       // todo broadcast
-
-       return nil
 }
 
 // sync
-func (r *routeWorker) sync() {
+func (r *routeWorker) sync() error {
        if *r.Route.ID != strconv.Itoa(0) {
                // 1. sync memDB
                db := &db.RouteDB{Routes: []*v1.Route{r.Route}}
                if err := db.UpdateRoute(); err != nil {
                        glog.Errorf("update route failed, route: %#v, err: 
%+v", r.Route, err)
-                       return
+                       return err
                }
                // 2. sync apisix
-               apisix.UpdateRoute(r.Route)
+               if err := apisix.UpdateRoute(r.Route); err != nil {
+                       return err
+               }
                glog.V(2).Infof("update route %s, %s", *r.Name, *r.ServiceId)
        } else {
                // 1. sync apisix and get id
                if res, err := apisix.AddRoute(r.Route); err != nil {
                        glog.Errorf("add route failed, route: %#v, err: %+v", 
r.Route, err)
-                       return
+                       return err
                } else {
                        key := res.Route.Key
                        tmp := strings.Split(*key, "/")
                        *r.ID = tmp[len(tmp)-1]
                }
                // 2. sync memDB
                db := &db.RouteDB{Routes: []*v1.Route{r.Route}}
-               db.Insert()
+               if err := db.Insert(); err != nil {
+                       return err
+               }
                glog.V(2).Infof("create route %s, %s", *r.Name, *r.ServiceId)
        }
+       return nil
 }
 
 // service
-func NewServiceWorkers(services []*v1.Service, rwg *RouteWorkerGroup) 
ServiceWorkerGroup {
+func NewServiceWorkers(ctx context.Context, services []*v1.Service, rwg 
*RouteWorkerGroup, wg *sync.WaitGroup, errorChan chan CRDStatus) 
ServiceWorkerGroup {
        swg := make(ServiceWorkerGroup)
        for _, s := range services {
-               quit := make(chan Quit)
-               rw := &serviceWorker{Service: s, Quit: quit}
+               rw := &serviceWorker{Service: s, Ctx: ctx, Wg: wg, ErrorChan: 
errorChan}
+               //rw.Wg.Add(1)
                rw.start(rwg)
                swg.Add(*s.UpstreamName, rw)
        }
        return swg
 }
 
 // upstream
-func SolverUpstream(upstreams []*v1.Upstream, swg ServiceWorkerGroup) {
+func SolverUpstream(upstreams []*v1.Upstream, swg ServiceWorkerGroup, wg 
*sync.WaitGroup, errorChan chan CRDStatus) {
        for _, u := range upstreams {
-               op := Update
-               if currentUpstream, err := apisix.FindCurrentUpstream(*u.Group, 
*u.Name, *u.FullName); err != nil {
-                       glog.Errorf("solver upstream failed, find upstream from 
etcd failed, upstream: %+v, err: %+v", u, err)
+               go SolverSingleUpstream(u, swg, wg, errorChan)
+       }
+}
+
+func SolverSingleUpstream(u *v1.Upstream, swg ServiceWorkerGroup, wg 
*sync.WaitGroup, errorChan chan CRDStatus) {
+       var errNotify error
+       defer func(err error) {
+               if err != nil {
+                       errorChan <- CRDStatus{Id: "", Status: "failure", Err: 
err}
+               }
+               wg.Done()
+       }(errNotify)
+       op := Update
+       if currentUpstream, err := apisix.FindCurrentUpstream(*u.Group, 
*u.Name, *u.FullName); err != nil {
+               glog.Errorf("solver upstream failed, find upstream from etcd 
failed, upstream: %+v, err: %+v", u, err)
+               errNotify = err
+               return
+       } else {
+               paddingUpstream(u, currentUpstream)
+               // diff
+               hasDiff, err := utils.HasDiff(u, currentUpstream)
+               if err != nil {
+                       errNotify = err
                        return
-               } else {
-                       paddingUpstream(u, currentUpstream)
-                       // diff
-                       hasDiff, _ := utils.HasDiff(u, currentUpstream)
-                       if hasDiff {
-                               if *u.ID != strconv.Itoa(0) {
-                                       op = Update
-                                       // 0.field check
-                                       needToUpdate := true
-                                       if currentUpstream.FromKind != nil && 
*(currentUpstream.FromKind) == ApisixUpstream { // update from ApisixUpstream
-                                               if u.FromKind == nil || 
(u.FromKind != nil && *(u.FromKind) != ApisixUpstream) {
-                                                       // currentUpstream > u
-                                                       // set lb && health 
check
-                                                       needToUpdate = false
-                                               }
-                                       }
-                                       if needToUpdate {
-                                               // 1.sync memDB
-                                               upstreamDB := 
&db.UpstreamDB{Upstreams: []*v1.Upstream{u}}
-                                               if err := 
upstreamDB.UpdateUpstreams(); err != nil {
-                                                       glog.Errorf("solver 
upstream failed, update upstream to local db failed, err: %s", err.Error())
-                                                       return
-                                               }
-                                               // 2.sync apisix
-                                               if err = 
apisix.UpdateUpstream(u); err != nil {
-                                                       glog.Errorf("solver 
upstream failed, update upstream to etcd failed, err: %+v", err)
-                                                       return
-                                               }
+               }
+               if hasDiff {
+                       if *u.ID != strconv.Itoa(0) {
+                               op = Update
+                               // 0.field check
+                               needToUpdate := true
+                               if currentUpstream.FromKind != nil && 
*(currentUpstream.FromKind) == ApisixUpstream { // update from ApisixUpstream
+                                       if u.FromKind == nil || (u.FromKind != 
nil && *(u.FromKind) != ApisixUpstream) {
+                                               // currentUpstream > u
+                                               // set lb && health check
+                                               needToUpdate = false
                                        }
-                                       // if fromKind == WatchFromKind
-                                       if u.FromKind != nil && *u.FromKind == 
WatchFromKind {
-                                               // 1.update nodes
-                                               if err = apisix.PatchNodes(u, 
u.Nodes); err != nil {
-                                                       glog.Errorf("solver 
upstream failed, patch node info to etcd failed, err: %+v", err)
-                                                       return
-                                               }
-                                               // 2. sync memDB
-                                               us := []*v1.Upstream{u}
-                                               if !needToUpdate {
-                                                       currentUpstream.Nodes = 
u.Nodes
-                                                       us = 
[]*v1.Upstream{currentUpstream}
-                                               }
-                                               upstreamDB := 
&db.UpstreamDB{Upstreams: us}
-                                               if err := 
upstreamDB.UpdateUpstreams(); err != nil {
-                                                       glog.Errorf("solver 
upstream failed, update upstream to local db failed, err: %s", err.Error())
-                                                       return
-                                               }
+                               }
+                               if needToUpdate {
+                                       // 1.sync memDB
+                                       upstreamDB := &db.UpstreamDB{Upstreams: 
[]*v1.Upstream{u}}
+                                       if err := upstreamDB.UpdateUpstreams(); 
err != nil {
+                                               glog.Errorf("solver upstream 
failed, update upstream to local db failed, err: %s", err.Error())
+                                               errNotify = err
+                                               return
                                        }
-                               } else {
-                                       op = Create
-                                       // 1.sync apisix and get response
-                                       if upstreamResponse, err := 
apisix.AddUpstream(u); err != nil {
+                                       // 2.sync apisix
+                                       if err = apisix.UpdateUpstream(u); err 
!= nil {
                                                glog.Errorf("solver upstream 
failed, update upstream to etcd failed, err: %+v", err)
+                                               errNotify = err
                                                return
-                                       } else {
-                                               tmp := 
strings.Split(*upstreamResponse.Upstream.Key, "/")
-                                               *u.ID = tmp[len(tmp)-1]
                                        }
-                                       // 2.sync memDB
-                                       
//apisix.InsertUpstreams([]*v1.Upstream{u})
-                                       upstreamDB := &db.UpstreamDB{Upstreams: 
[]*v1.Upstream{u}}
-                                       upstreamDB.InsertUpstreams()
                                }
+                               // if fromKind == WatchFromKind
+                               if u.FromKind != nil && *u.FromKind == 
WatchFromKind {
+                                       // 1.update nodes
+                                       if err = apisix.PatchNodes(u, u.Nodes); 
err != nil {
+                                               glog.Errorf("solver upstream 
failed, patch node info to etcd failed, err: %+v", err)
+                                               errNotify = err
+                                               return
+                                       }
+                                       // 2. sync memDB
+                                       us := []*v1.Upstream{u}
+                                       if !needToUpdate {
+                                               currentUpstream.Nodes = u.Nodes
+                                               us = 
[]*v1.Upstream{currentUpstream}
+                                       }
+                                       upstreamDB := &db.UpstreamDB{Upstreams: 
us}
+                                       if err := upstreamDB.UpdateUpstreams(); 
err != nil {
+                                               glog.Errorf("solver upstream 
failed, update upstream to local db failed, err: %s", err.Error())
+                                               errNotify = err
+                                               return
+                                       }
+                               }
+                       } else {
+                               op = Create
+                               // 1.sync apisix and get response
+                               if upstreamResponse, err := 
apisix.AddUpstream(u); err != nil {
+                                       glog.Errorf("solver upstream failed, 
update upstream to etcd failed, err: %+v", err)
+                                       errNotify = err
+                                       return
+                               } else {
+                                       tmp := 
strings.Split(*upstreamResponse.Upstream.Key, "/")
+                                       *u.ID = tmp[len(tmp)-1]
+                               }
+                               // 2.sync memDB
+                               //apisix.InsertUpstreams([]*v1.Upstream{u})
+                               upstreamDB := &db.UpstreamDB{Upstreams: 
[]*v1.Upstream{u}}
+                               upstreamDB.InsertUpstreams()
                        }
                }
-               glog.V(2).Infof("solver upstream %s:%s", op, *u.Name)
-               // anyway, broadcast to service
-               serviceWorkers := swg[*u.Name]
-               for _, sw := range serviceWorkers {
-                       event := &Event{Kind: UpstreamKind, Op: op, Obj: u}
-                       sw.Event <- *event
-               }
+       }
+       glog.V(2).Infof("solver upstream %s:%s", op, *u.Name)

Review comment:
       Should change the log from glog to `pkg/log`.

##########
File path: test/e2e/scaffold/crd.go
##########
@@ -148,3 +153,22 @@ func (s *Scaffold) EnsureNumApisixUpstreamsCreated(desired 
int) error {
        }
        return ensureNumApisixCRDsCreated(u.String(), desired)
 }
+

Review comment:
       Should add comments for this method.

##########
File path: test/e2e/scaffold/ingress.go
##########
@@ -78,7 +78,7 @@ spec:
               port: 8080
             timeoutSeconds: 2
           image: "viewking/apisix-ingress-controller:dev"
-          imagePullPolicy: IfNotPresent
+          imagePullPolicy: Always

Review comment:
       It will be too slow if using `Always` policy.

##########
File path: pkg/seven/state/builder.go
##########
@@ -84,161 +86,198 @@ func paddingUpstream(upstream *v1.Upstream, 
currentUpstream *v1.Upstream) {
 // NewRouteWorkers make routeWrokers group by service per CRD
 // 1.make routes group by (1_2_3) it may be a map like map[1_2_3][]Route;
 // 2.route is listenning Event from the ready of 1_2_3;
-func NewRouteWorkers(routes []*v1.Route) RouteWorkerGroup {
+func NewRouteWorkers(ctx context.Context, routes []*v1.Route, wg 
*sync.WaitGroup, errorChan chan CRDStatus) RouteWorkerGroup {
        rwg := make(RouteWorkerGroup)
        for _, r := range routes {
-               quit := make(chan Quit)
-               rw := &routeWorker{Route: r, Quit: quit}
+               rw := &routeWorker{Route: r, Ctx: ctx, Wg: wg, ErrorChan: 
errorChan}
                rw.start()
                rwg.Add(*r.ServiceName, rw)
        }
        return rwg
 }
 
 // 3.route get the Event and trigger a padding for object,then diff,sync;
-func (r *routeWorker) trigger(event Event) error {
-       defer close(r.Quit)
+func (r *routeWorker) trigger(event Event) {
+       var errNotify error
+       defer func(err error) {
+               if err != nil {
+                       r.ErrorChan <- CRDStatus{Id: "", Status: "failure", 
Err: err}
+               }
+               r.Wg.Done()
+       }(errNotify)
        // consumer Event
        service := event.Obj.(*v1.Service)
        r.ServiceId = service.ID
        glog.V(2).Infof("trigger routeWorker %s from %s, %s", *r.Name, 
event.Op, *service.Name)
 
        // padding
-       currentRoute, _ := apisix.FindCurrentRoute(r.Route)
+       currentRoute, err := apisix.FindCurrentRoute(r.Route)
+       if err != nil && err.Error() != "NOT FOUND" {

Review comment:
       Why not exposing an error like `ErrNotFound`.
   
   ```go
   if err != nil && err != ErrNotFound {
       ......
   }
   ```

##########
File path: test/e2e/scaffold/httpbin.go
##########
@@ -84,7 +85,9 @@ spec:
 )
 
 func (s *Scaffold) newHTTPBIN() (*corev1.Service, error) {
-       if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, 
_httpbinDeployment); err != nil {
+       httpbinDeployment := fmt.Sprintf(_httpbinDeployment, 1)
+       fmt.Println(httpbinDeployment)

Review comment:
       Remove the debug code.

##########
File path: test/e2e/scaffold/httpbin.go
##########
@@ -96,3 +99,11 @@ func (s *Scaffold) newHTTPBIN() (*corev1.Service, error) {
        }
        return svc, nil
 }
+
+func (s *Scaffold) ScaleHTTPBIN(num int) error {

Review comment:
       Does teratest has methods like `kubectl scale`? We can use it if any.

##########
File path: test/e2e/ingress/route.go
##########
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package ingress
+
+import (
+       "github.com/api7/ingress-controller/test/e2e/scaffold"
+       "github.com/onsi/ginkgo"
+       "github.com/stretchr/testify/assert"
+       "time"
+)
+
+var _ = ginkgo.Describe("upstream expansion", func() {
+       s := scaffold.NewDefaultScaffold()
+       ginkgo.It("create and then scale to 2 ", func() {
+               apisixRoute := `
+apiVersion: apisix.apache.org/v1
+kind: ApisixRoute
+metadata:
+  name: httpbin-route
+spec:
+  rules:
+  - host: httpbin.com
+    http:
+      paths:
+      - backend:
+          serviceName: httpbin-service-e2e-test
+          servicePort: 80
+        path: /ip
+`
+               s.CreateApisixRouteByString(apisixRoute)
+
+               err := s.EnsureNumApisixRoutesCreated(1)
+               assert.Nil(ginkgo.GinkgoT(), err, "Checking number of routes")
+               err = s.EnsureNumApisixUpstreamsCreated(1)
+               assert.Nil(ginkgo.GinkgoT(), err, "Checking number of 
upstreams")
+               s.ScaleHTTPBIN(2)
+               time.Sleep(25 * time.Second)

Review comment:
       Why sleeping 25s? If we need to wait the service to ready, just poll it.

##########
File path: pkg/seven/state/solver.go
##########
@@ -15,54 +15,99 @@
 package state
 
 import (
+       "context"
+       "errors"
        "github.com/api7/ingress-controller/pkg/seven/apisix"
        "github.com/api7/ingress-controller/pkg/seven/db"
-       v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+       "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+       "sync"
+       "time"
 )
 
 var UpstreamQueue chan UpstreamQueueObj
-var ServiceQueue chan ServiceQueueObj
 
 func init() {
        UpstreamQueue = make(chan UpstreamQueueObj, 500)
-       ServiceQueue = make(chan ServiceQueueObj, 500)
        go WatchUpstream()
-       go WatchService()
-}
-
-func WatchService() {
-       for {
-               sqo := <-ServiceQueue
-               // solver service
-               SolverService(sqo.Services, sqo.RouteWorkerGroup)
-       }
 }
 
 func WatchUpstream() {
        for {
                uqo := <-UpstreamQueue
-               SolverUpstream(uqo.Upstreams, uqo.ServiceWorkerGroup)
+               SolverUpstream(uqo.Upstreams, uqo.ServiceWorkerGroup, uqo.Wg, 
uqo.ErrorChan)
        }
 }
 
 // Solver
-func (s *ApisixCombination) Solver() (bool, error) {
-       // 1.route workers
-       rwg := NewRouteWorkers(s.Routes)
-       // 2.service workers
-       swg := NewServiceWorkers(s.Services, &rwg)
-       //sqo := &ServiceQueueObj{Services: s.Services, RouteWorkerGroup: rwg}
-       //sqo.AddQueue()
-       // 3.upstream workers
-       uqo := &UpstreamQueueObj{Upstreams: s.Upstreams, ServiceWorkerGroup: 
swg}
+func (s *ApisixCombination) Solver() (string, error) {
+       // define the result notify
+       timeout := 15 * time.Second
+       resultChan := make(chan CRDStatus)
+       ctx := context.Background()
+       ctx, _ = context.WithTimeout(ctx, timeout)
+       go s.SyncWithGroup(ctx, "", resultChan)
+
+       // add timeout after 5s
+       return WaitWorkerGroup("", resultChan)
+}
+
+func waitTimeout(wg *sync.WaitGroup, timeout time.Duration, resultChan chan 
CRDStatus) {
+       c := make(chan struct{})
+       go func() {
+               defer close(c)
+               wg.Wait()
+       }()
+       select {
+       case <-c:
+               resultChan <- CRDStatus{Id: "", Status: "success", Err: nil}
+       case <-time.After(timeout):
+               resultChan <- CRDStatus{Id: "", Status: "failure", Err: 
errors.New("timeout")}
+       }
+}
+
+func (s *ApisixCombination) SyncWithGroup(ctx context.Context, id string, 
resultChan chan CRDStatus) {
+       var wg sync.WaitGroup
+       count := len(s.Routes) + len(s.Services) + len(s.Upstreams)
+       wg.Add(count)
+       // goroutine for sync route/service/upstream
+       // route
+       rwg := NewRouteWorkers(ctx, s.Routes, &wg, resultChan)
+       // service
+       swg := NewServiceWorkers(ctx, s.Services, &rwg, &wg, resultChan)
+       // upstream
+       uqo := &UpstreamQueueObj{Upstreams: s.Upstreams, ServiceWorkerGroup: 
swg, Wg: &wg, ErrorChan: resultChan}
        uqo.AddQueue()
-       return true, nil
+       // waitTimeout should be shorter than worker timeout
+       waitTimeout(&wg, 10*time.Second, resultChan)
+}
+
+func WaitWorkerGroup(id string, resultChan chan CRDStatus) (string, error) {
+       for {

Review comment:
       Your code can be simplified to one line, since the select only has one 
arm.
   
   ```
   r := <-resultChan
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to