tokers commented on a change in pull request #149:
URL: 
https://github.com/apache/apisix-ingress-controller/pull/149#discussion_r551695917



##########
File path: pkg/seven/state/builder.go
##########
@@ -84,161 +86,198 @@ func paddingUpstream(upstream *v1.Upstream, 
currentUpstream *v1.Upstream) {
 // NewRouteWorkers make routeWrokers group by service per CRD
 // 1.make routes group by (1_2_3) it may be a map like map[1_2_3][]Route;
 // 2.route is listenning Event from the ready of 1_2_3;
-func NewRouteWorkers(routes []*v1.Route) RouteWorkerGroup {
+func NewRouteWorkers(ctx context.Context, routes []*v1.Route, wg 
*sync.WaitGroup, errorChan chan CRDStatus) RouteWorkerGroup {
        rwg := make(RouteWorkerGroup)
        for _, r := range routes {
-               quit := make(chan Quit)
-               rw := &routeWorker{Route: r, Quit: quit}
+               rw := &routeWorker{Route: r, Ctx: ctx, Wg: wg, ErrorChan: 
errorChan}
                rw.start()
                rwg.Add(*r.ServiceName, rw)
        }
        return rwg
 }
 
 // 3.route get the Event and trigger a padding for object,then diff,sync;
-func (r *routeWorker) trigger(event Event) error {
-       defer close(r.Quit)
+func (r *routeWorker) trigger(event Event) {
+       var errNotify error
+       defer func() {
+               if errNotify != nil {
+                       r.ErrorChan <- CRDStatus{Id: "", Status: "failure", 
Err: errNotify}
+               }
+               r.Wg.Done()
+       }()
        // consumer Event
        service := event.Obj.(*v1.Service)
        r.ServiceId = service.ID
-       glog.V(2).Infof("trigger routeWorker %s from %s, %s", *r.Name, 
event.Op, *service.Name)
+       log.Infof("trigger routeWorker %s from %s, %s", *r.Name, event.Op, 
*service.Name)
 
        // padding
-       currentRoute, _ := apisix.FindCurrentRoute(r.Route)
+       currentRoute, err := apisix.FindCurrentRoute(r.Route)
+       if err != nil && !errors.Is(err, utils.NotFound) {
+               errNotify = err
+               return
+       }
        paddingRoute(r.Route, currentRoute)
        // diff
        hasDiff, err := utils.HasDiff(r.Route, currentRoute)
        // sync
        if err != nil {
-               return err
+               errNotify = err
+               return
        }
        if hasDiff {
-               r.sync()
+               err := r.sync()
+               if err != nil {
+                       errNotify = err
+                       return
+               }
        }
-       // todo broadcast
-
-       return nil
 }
 
 // sync
-func (r *routeWorker) sync() {
+func (r *routeWorker) sync() error {
        if *r.Route.ID != strconv.Itoa(0) {
                // 1. sync memDB
                db := &db.RouteDB{Routes: []*v1.Route{r.Route}}
                if err := db.UpdateRoute(); err != nil {
-                       glog.Errorf("update route failed, route: %#v, err: 
%+v", r.Route, err)
-                       return
+                       log.Errorf("update route failed, route: %#v, err: %+v", 
r.Route, err)
+                       return err
                }
                // 2. sync apisix
-               apisix.UpdateRoute(r.Route)
-               glog.V(2).Infof("update route %s, %s", *r.Name, *r.ServiceId)
+               if err := apisix.UpdateRoute(r.Route); err != nil {
+                       return err
+               }
+               log.Infof("update route %s, %s", *r.Name, *r.ServiceId)
        } else {
                // 1. sync apisix and get id
                if res, err := apisix.AddRoute(r.Route); err != nil {
-                       glog.Errorf("add route failed, route: %#v, err: %+v", 
r.Route, err)
-                       return
+                       log.Errorf("add route failed, route: %#v, err: %+v", 
r.Route, err)
+                       return err
                } else {
                        key := res.Route.Key
                        tmp := strings.Split(*key, "/")
                        *r.ID = tmp[len(tmp)-1]
                }
                // 2. sync memDB
                db := &db.RouteDB{Routes: []*v1.Route{r.Route}}
-               db.Insert()
-               glog.V(2).Infof("create route %s, %s", *r.Name, *r.ServiceId)
+               if err := db.Insert(); err != nil {
+                       return err
+               }
+               log.Infof("create route %s, %s", *r.Name, *r.ServiceId)
        }
+       return nil
 }
 
 // service
-func NewServiceWorkers(services []*v1.Service, rwg *RouteWorkerGroup) 
ServiceWorkerGroup {
+func NewServiceWorkers(ctx context.Context, services []*v1.Service, rwg 
*RouteWorkerGroup, wg *sync.WaitGroup, errorChan chan CRDStatus) 
ServiceWorkerGroup {
        swg := make(ServiceWorkerGroup)
        for _, s := range services {
-               quit := make(chan Quit)
-               rw := &serviceWorker{Service: s, Quit: quit}
+               rw := &serviceWorker{Service: s, Ctx: ctx, Wg: wg, ErrorChan: 
errorChan}
+               //rw.Wg.Add(1)
                rw.start(rwg)
                swg.Add(*s.UpstreamName, rw)
        }
        return swg
 }
 
 // upstream
-func SolverUpstream(upstreams []*v1.Upstream, swg ServiceWorkerGroup) {
+func SolverUpstream(upstreams []*v1.Upstream, swg ServiceWorkerGroup, wg 
*sync.WaitGroup, errorChan chan CRDStatus) {
        for _, u := range upstreams {
-               op := Update
-               if currentUpstream, err := apisix.FindCurrentUpstream(*u.Group, 
*u.Name, *u.FullName); err != nil {
-                       glog.Errorf("solver upstream failed, find upstream from 
etcd failed, upstream: %+v, err: %+v", u, err)
+               go SolverSingleUpstream(u, swg, wg, errorChan)
+       }
+}
+
+func SolverSingleUpstream(u *v1.Upstream, swg ServiceWorkerGroup, wg 
*sync.WaitGroup, errorChan chan CRDStatus) {
+       var errNotify error
+       defer func() {
+               if errNotify != nil {
+                       errorChan <- CRDStatus{Id: "", Status: "failure", Err: 
errNotify}
+               }
+               wg.Done()
+       }()
+       op := Update
+       if currentUpstream, err := apisix.FindCurrentUpstream(*u.Group, 
*u.Name, *u.FullName); err != nil {
+               log.Errorf("solver upstream failed, find upstream from etcd 
failed, upstream: %+v, err: %+v", u, err)
+               errNotify = err
+               return
+       } else {
+               paddingUpstream(u, currentUpstream)
+               // diff
+               hasDiff, err := utils.HasDiff(u, currentUpstream)
+               if err != nil {
+                       errNotify = err
                        return
-               } else {
-                       paddingUpstream(u, currentUpstream)
-                       // diff
-                       hasDiff, _ := utils.HasDiff(u, currentUpstream)
-                       if hasDiff {
-                               if *u.ID != strconv.Itoa(0) {
-                                       op = Update
-                                       // 0.field check
-                                       needToUpdate := true
-                                       if currentUpstream.FromKind != nil && 
*(currentUpstream.FromKind) == ApisixUpstream { // update from ApisixUpstream
-                                               if u.FromKind == nil || 
(u.FromKind != nil && *(u.FromKind) != ApisixUpstream) {
-                                                       // currentUpstream > u
-                                                       // set lb && health 
check
-                                                       needToUpdate = false
-                                               }
+               }
+               if hasDiff {
+                       if *u.ID != strconv.Itoa(0) {

Review comment:
       What means of the magic number `0`.

##########
File path: pkg/seven/apisix/route.go
##########
@@ -49,7 +49,7 @@ func FindCurrentRoute(route *v1.Route) (*v1.Route, error) {
                }
 
        }
-       return nil, fmt.Errorf("NOT FOUND")
+       return nil, utils.NotFound

Review comment:
       What about`ErrNotFound`? It a convention that all error object should be 
named with prefix `Err`.

##########
File path: pkg/seven/state/builder.go
##########
@@ -84,161 +86,198 @@ func paddingUpstream(upstream *v1.Upstream, 
currentUpstream *v1.Upstream) {
 // NewRouteWorkers make routeWrokers group by service per CRD
 // 1.make routes group by (1_2_3) it may be a map like map[1_2_3][]Route;
 // 2.route is listenning Event from the ready of 1_2_3;
-func NewRouteWorkers(routes []*v1.Route) RouteWorkerGroup {
+func NewRouteWorkers(ctx context.Context, routes []*v1.Route, wg 
*sync.WaitGroup, errorChan chan CRDStatus) RouteWorkerGroup {
        rwg := make(RouteWorkerGroup)
        for _, r := range routes {
-               quit := make(chan Quit)
-               rw := &routeWorker{Route: r, Quit: quit}
+               rw := &routeWorker{Route: r, Ctx: ctx, Wg: wg, ErrorChan: 
errorChan}
                rw.start()
                rwg.Add(*r.ServiceName, rw)
        }
        return rwg
 }
 
 // 3.route get the Event and trigger a padding for object,then diff,sync;
-func (r *routeWorker) trigger(event Event) error {
-       defer close(r.Quit)
+func (r *routeWorker) trigger(event Event) {
+       var errNotify error
+       defer func() {
+               if errNotify != nil {
+                       r.ErrorChan <- CRDStatus{Id: "", Status: "failure", 
Err: errNotify}
+               }
+               r.Wg.Done()
+       }()
        // consumer Event
        service := event.Obj.(*v1.Service)
        r.ServiceId = service.ID
-       glog.V(2).Infof("trigger routeWorker %s from %s, %s", *r.Name, 
event.Op, *service.Name)
+       log.Infof("trigger routeWorker %s from %s, %s", *r.Name, event.Op, 
*service.Name)
 
        // padding
-       currentRoute, _ := apisix.FindCurrentRoute(r.Route)
+       currentRoute, err := apisix.FindCurrentRoute(r.Route)
+       if err != nil && !errors.Is(err, utils.NotFound) {
+               errNotify = err
+               return
+       }
        paddingRoute(r.Route, currentRoute)
        // diff
        hasDiff, err := utils.HasDiff(r.Route, currentRoute)
        // sync
        if err != nil {
-               return err
+               errNotify = err
+               return
        }
        if hasDiff {
-               r.sync()
+               err := r.sync()
+               if err != nil {
+                       errNotify = err
+                       return
+               }
        }
-       // todo broadcast
-
-       return nil
 }
 
 // sync
-func (r *routeWorker) sync() {
+func (r *routeWorker) sync() error {
        if *r.Route.ID != strconv.Itoa(0) {
                // 1. sync memDB
                db := &db.RouteDB{Routes: []*v1.Route{r.Route}}
                if err := db.UpdateRoute(); err != nil {
-                       glog.Errorf("update route failed, route: %#v, err: 
%+v", r.Route, err)
-                       return
+                       log.Errorf("update route failed, route: %#v, err: %+v", 
r.Route, err)
+                       return err
                }
                // 2. sync apisix
-               apisix.UpdateRoute(r.Route)
-               glog.V(2).Infof("update route %s, %s", *r.Name, *r.ServiceId)
+               if err := apisix.UpdateRoute(r.Route); err != nil {
+                       return err
+               }
+               log.Infof("update route %s, %s", *r.Name, *r.ServiceId)
        } else {
                // 1. sync apisix and get id
                if res, err := apisix.AddRoute(r.Route); err != nil {
-                       glog.Errorf("add route failed, route: %#v, err: %+v", 
r.Route, err)
-                       return
+                       log.Errorf("add route failed, route: %#v, err: %+v", 
r.Route, err)
+                       return err
                } else {
                        key := res.Route.Key
                        tmp := strings.Split(*key, "/")
                        *r.ID = tmp[len(tmp)-1]
                }
                // 2. sync memDB
                db := &db.RouteDB{Routes: []*v1.Route{r.Route}}
-               db.Insert()
-               glog.V(2).Infof("create route %s, %s", *r.Name, *r.ServiceId)
+               if err := db.Insert(); err != nil {
+                       return err
+               }
+               log.Infof("create route %s, %s", *r.Name, *r.ServiceId)
        }
+       return nil
 }
 
 // service
-func NewServiceWorkers(services []*v1.Service, rwg *RouteWorkerGroup) 
ServiceWorkerGroup {
+func NewServiceWorkers(ctx context.Context, services []*v1.Service, rwg 
*RouteWorkerGroup, wg *sync.WaitGroup, errorChan chan CRDStatus) 
ServiceWorkerGroup {
        swg := make(ServiceWorkerGroup)
        for _, s := range services {
-               quit := make(chan Quit)
-               rw := &serviceWorker{Service: s, Quit: quit}
+               rw := &serviceWorker{Service: s, Ctx: ctx, Wg: wg, ErrorChan: 
errorChan}
+               //rw.Wg.Add(1)
                rw.start(rwg)
                swg.Add(*s.UpstreamName, rw)
        }
        return swg
 }
 
 // upstream
-func SolverUpstream(upstreams []*v1.Upstream, swg ServiceWorkerGroup) {
+func SolverUpstream(upstreams []*v1.Upstream, swg ServiceWorkerGroup, wg 
*sync.WaitGroup, errorChan chan CRDStatus) {
        for _, u := range upstreams {
-               op := Update
-               if currentUpstream, err := apisix.FindCurrentUpstream(*u.Group, 
*u.Name, *u.FullName); err != nil {
-                       glog.Errorf("solver upstream failed, find upstream from 
etcd failed, upstream: %+v, err: %+v", u, err)
+               go SolverSingleUpstream(u, swg, wg, errorChan)
+       }
+}
+
+func SolverSingleUpstream(u *v1.Upstream, swg ServiceWorkerGroup, wg 
*sync.WaitGroup, errorChan chan CRDStatus) {
+       var errNotify error
+       defer func() {
+               if errNotify != nil {
+                       errorChan <- CRDStatus{Id: "", Status: "failure", Err: 
errNotify}
+               }
+               wg.Done()
+       }()
+       op := Update
+       if currentUpstream, err := apisix.FindCurrentUpstream(*u.Group, 
*u.Name, *u.FullName); err != nil {
+               log.Errorf("solver upstream failed, find upstream from etcd 
failed, upstream: %+v, err: %+v", u, err)

Review comment:
       should be "find upstream from apisix failed".

##########
File path: pkg/seven/utils/types.go
##########
@@ -13,3 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 package utils
+
+import "errors"
+
+var NotFound = errors.New("NOT FOUND")

Review comment:
       Should add comments for each export-able variable.

##########
File path: test/e2e/scaffold/httpbin.go
##########
@@ -84,7 +86,8 @@ spec:
 )
 
 func (s *Scaffold) newHTTPBIN() (*corev1.Service, error) {
-       if err := k8s.KubectlApplyFromStringE(s.t, s.kubectlOptions, 
_httpbinDeployment); err != nil {
+       httpbinDeployment := fmt.Sprintf(_httpbinDeployment, 1)

Review comment:
       If treat the _httpbinDeployment as a template, we should change its name 
to `_httpbinDeploymentTemplate`.

##########
File path: pkg/seven/state/solver.go
##########
@@ -15,54 +15,96 @@
 package state
 
 import (
+       "context"
+       "errors"
+       "sync"
+       "time"
+
        "github.com/api7/ingress-controller/pkg/seven/apisix"
        "github.com/api7/ingress-controller/pkg/seven/db"
-       v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+       "github.com/api7/ingress-controller/pkg/types/apisix/v1"
 )
 
 var UpstreamQueue chan UpstreamQueueObj
-var ServiceQueue chan ServiceQueueObj
 
 func init() {
        UpstreamQueue = make(chan UpstreamQueueObj, 500)
-       ServiceQueue = make(chan ServiceQueueObj, 500)
        go WatchUpstream()
-       go WatchService()
-}
-
-func WatchService() {
-       for {
-               sqo := <-ServiceQueue
-               // solver service
-               SolverService(sqo.Services, sqo.RouteWorkerGroup)
-       }
 }
 
 func WatchUpstream() {
        for {
                uqo := <-UpstreamQueue
-               SolverUpstream(uqo.Upstreams, uqo.ServiceWorkerGroup)
+               SolverUpstream(uqo.Upstreams, uqo.ServiceWorkerGroup, uqo.Wg, 
uqo.ErrorChan)
        }
 }
 
 // Solver
-func (s *ApisixCombination) Solver() (bool, error) {
-       // 1.route workers
-       rwg := NewRouteWorkers(s.Routes)
-       // 2.service workers
-       swg := NewServiceWorkers(s.Services, &rwg)
-       //sqo := &ServiceQueueObj{Services: s.Services, RouteWorkerGroup: rwg}
-       //sqo.AddQueue()
-       // 3.upstream workers
-       uqo := &UpstreamQueueObj{Upstreams: s.Upstreams, ServiceWorkerGroup: 
swg}
+func (s *ApisixCombination) Solver() (string, error) {
+       // define the result notify
+       timeout := 1500 * time.Second

Review comment:
       Why 1500s?

##########
File path: pkg/seven/state/service_worker.go
##########
@@ -64,114 +67,93 @@ func (w *serviceWorker) trigger(event Event, rwg 
*RouteWorkerGroup) error {
        // add to queue
        services := []*v1.Service{w.Service}
        sqo := &ServiceQueueObj{Services: services, RouteWorkerGroup: *rwg}
-       sqo.AddQueue()
-
-       //op := Update
-       //// padding
-       //currentService, _ := apisix.FindCurrentService(*w.Service.Name)
-       //paddingService(w.Service, currentService)
-       //// diff
-       //hasDiff, err := utils.HasDiff(w.Service, currentService)
-       //// sync
-       //if err != nil {
-       //      return err
-       //}
-       //if hasDiff {
-       //      if *w.Service.ID == strconv.Itoa(0) {
-       //              op = Create
-       //              // 1. sync apisix and get id
-       //              if serviceResponse, err := apisix.AddService(w.Service, 
conf.BaseUrl); err != nil {
-       //                      // todo log error
-       //                      glog.Info(err.Error())
-       //              }else {
-       //                      tmp := 
strings.Split(*serviceResponse.Service.Key, "/")
-       //                      *w.Service.ID = tmp[len(tmp) - 1]
-       //              }
-       //              // 2. sync memDB
-       //              db := &db.ServiceDB{Services: []*v1.Service{w.Service}}
-       //              db.Insert()
-       //              glog.Infof("create service %s, %s", *w.Name, 
*w.UpstreamId)
-       //      }else {
-       //              op = Update
-       //              // 1. sync memDB
-       //              db := db.ServiceDB{Services: []*v1.Service{w.Service}}
-       //              if err := db.UpdateService(); err != nil {
-       //                      // todo log error
-       //              }
-       //              // 2. sync apisix
-       //              apisix.UpdateService(w.Service, conf.BaseUrl)
-       //              glog.Infof("update service %s, %s", *w.Name, 
*w.UpstreamId)
-       //      }
-       //}
-       //// broadcast to route
-       //routeWorkers := (*rwg)[*w.Service.Name]
-       //for _, rw := range routeWorkers{
-       //      event := &Event{Kind: ServiceKind, Op: op, Obj: w.Service}
-       //      glog.Infof("send event %s, %s, %s", event.Kind, event.Op, 
*w.Service.Name)
-       //      rw.Event <- *event
-       //}
+       //sqo.AddQueue()
+
+       SolverService(sqo.Services, sqo.RouteWorkerGroup, w.Wg, w.ErrorChan)
        return nil
 }
 
-func SolverService(services []*v1.Service, rwg RouteWorkerGroup) error {
+func SolverService(services []*v1.Service, rwg RouteWorkerGroup, wg 
*sync.WaitGroup, errorChan chan CRDStatus) {
        for _, svc := range services {
-               op := Update
-               // padding
-               currentService, _ := apisix.FindCurrentService(*svc.Group, 
*svc.Name, *svc.FullName)
-               paddingService(svc, currentService)
-               // diff
-               hasDiff, err := utils.HasDiff(svc, currentService)
-               // sync
-               if err != nil {
-                       return err
+               go SolverSingleService(svc, rwg, wg, errorChan)
+       }
+}
+
+func SolverSingleService(svc *v1.Service, rwg RouteWorkerGroup, wg 
*sync.WaitGroup, errorChan chan CRDStatus) {
+       var errNotify error
+       defer func() {
+               if errNotify != nil {
+                       errorChan <- CRDStatus{Id: "", Status: "failure", Err: 
errNotify}
                }
-               if hasDiff {
-                       if *svc.ID == strconv.Itoa(0) {
-                               op = Create
-                               // 1. sync apisix and get id
-                               if serviceResponse, err := 
apisix.AddService(svc); err != nil {
-                                       // todo log error
-                                       glog.V(2).Info(err.Error())
-                               } else {
-                                       tmp := 
strings.Split(*serviceResponse.Service.Key, "/")
-                                       *svc.ID = tmp[len(tmp)-1]
-                               }
-                               // 2. sync memDB
-                               db := &db.ServiceDB{Services: 
[]*v1.Service{svc}}
-                               db.Insert()
-                               glog.V(2).Infof("create service %s, %s", 
*svc.Name, *svc.UpstreamId)
+               wg.Done()
+       }()
+
+       op := Update
+       // padding
+       currentService, _ := apisix.FindCurrentService(*svc.Group, *svc.Name, 
*svc.FullName)
+       paddingService(svc, currentService)
+       // diff
+       hasDiff, err := utils.HasDiff(svc, currentService)
+       // sync
+       if err != nil {
+               errNotify = err
+               return
+       }
+       if hasDiff {
+               if *svc.ID == strconv.Itoa(0) {

Review comment:
       Bad magic number, what means of "0"?

##########
File path: test/e2e/scaffold/ingress.go
##########
@@ -78,7 +78,7 @@ spec:
               port: 8080
             timeoutSeconds: 2
           image: "viewking/apisix-ingress-controller:dev"
-          imagePullPolicy: IfNotPresent
+          imagePullPolicy: Always

Review comment:
       Even so, this should be an option, since in CI environment, we don't 
have this issue.

##########
File path: test/e2e/scaffold/scaffold.go
##########
@@ -208,3 +210,7 @@ func waitExponentialBackoff(condFunc func() (bool, error)) 
error {
        }
        return wait.ExponentialBackoff(backoff, condFunc)
 }
+
+func (s *Scaffold) WaitUntilNumPodsCreatedE(selector metav1.ListOptions, 
desiredCount int, retries int, interval time.Duration) error {
+       return k8s.WaitUntilNumPodsCreatedE(s.t, s.kubectlOptions, selector, 
desiredCount, retries, interval)

Review comment:
       What's the meaning for this encapsulation?

##########
File path: pkg/seven/state/service_worker.go
##########
@@ -64,114 +67,93 @@ func (w *serviceWorker) trigger(event Event, rwg 
*RouteWorkerGroup) error {
        // add to queue
        services := []*v1.Service{w.Service}
        sqo := &ServiceQueueObj{Services: services, RouteWorkerGroup: *rwg}
-       sqo.AddQueue()
-
-       //op := Update
-       //// padding
-       //currentService, _ := apisix.FindCurrentService(*w.Service.Name)
-       //paddingService(w.Service, currentService)
-       //// diff
-       //hasDiff, err := utils.HasDiff(w.Service, currentService)
-       //// sync
-       //if err != nil {
-       //      return err
-       //}
-       //if hasDiff {
-       //      if *w.Service.ID == strconv.Itoa(0) {
-       //              op = Create
-       //              // 1. sync apisix and get id
-       //              if serviceResponse, err := apisix.AddService(w.Service, 
conf.BaseUrl); err != nil {
-       //                      // todo log error
-       //                      glog.Info(err.Error())
-       //              }else {
-       //                      tmp := 
strings.Split(*serviceResponse.Service.Key, "/")
-       //                      *w.Service.ID = tmp[len(tmp) - 1]
-       //              }
-       //              // 2. sync memDB
-       //              db := &db.ServiceDB{Services: []*v1.Service{w.Service}}
-       //              db.Insert()
-       //              glog.Infof("create service %s, %s", *w.Name, 
*w.UpstreamId)
-       //      }else {
-       //              op = Update
-       //              // 1. sync memDB
-       //              db := db.ServiceDB{Services: []*v1.Service{w.Service}}
-       //              if err := db.UpdateService(); err != nil {
-       //                      // todo log error
-       //              }
-       //              // 2. sync apisix
-       //              apisix.UpdateService(w.Service, conf.BaseUrl)
-       //              glog.Infof("update service %s, %s", *w.Name, 
*w.UpstreamId)
-       //      }
-       //}
-       //// broadcast to route
-       //routeWorkers := (*rwg)[*w.Service.Name]
-       //for _, rw := range routeWorkers{
-       //      event := &Event{Kind: ServiceKind, Op: op, Obj: w.Service}
-       //      glog.Infof("send event %s, %s, %s", event.Kind, event.Op, 
*w.Service.Name)
-       //      rw.Event <- *event
-       //}
+       //sqo.AddQueue()
+
+       SolverService(sqo.Services, sqo.RouteWorkerGroup, w.Wg, w.ErrorChan)
        return nil
 }
 
-func SolverService(services []*v1.Service, rwg RouteWorkerGroup) error {
+func SolverService(services []*v1.Service, rwg RouteWorkerGroup, wg 
*sync.WaitGroup, errorChan chan CRDStatus) {
        for _, svc := range services {
-               op := Update
-               // padding
-               currentService, _ := apisix.FindCurrentService(*svc.Group, 
*svc.Name, *svc.FullName)
-               paddingService(svc, currentService)
-               // diff
-               hasDiff, err := utils.HasDiff(svc, currentService)
-               // sync
-               if err != nil {
-                       return err
+               go SolverSingleService(svc, rwg, wg, errorChan)
+       }
+}
+
+func SolverSingleService(svc *v1.Service, rwg RouteWorkerGroup, wg 
*sync.WaitGroup, errorChan chan CRDStatus) {
+       var errNotify error
+       defer func() {
+               if errNotify != nil {
+                       errorChan <- CRDStatus{Id: "", Status: "failure", Err: 
errNotify}
                }
-               if hasDiff {
-                       if *svc.ID == strconv.Itoa(0) {
-                               op = Create
-                               // 1. sync apisix and get id
-                               if serviceResponse, err := 
apisix.AddService(svc); err != nil {
-                                       // todo log error
-                                       glog.V(2).Info(err.Error())
-                               } else {
-                                       tmp := 
strings.Split(*serviceResponse.Service.Key, "/")
-                                       *svc.ID = tmp[len(tmp)-1]
-                               }
-                               // 2. sync memDB
-                               db := &db.ServiceDB{Services: 
[]*v1.Service{svc}}
-                               db.Insert()
-                               glog.V(2).Infof("create service %s, %s", 
*svc.Name, *svc.UpstreamId)
+               wg.Done()
+       }()
+
+       op := Update
+       // padding
+       currentService, _ := apisix.FindCurrentService(*svc.Group, *svc.Name, 
*svc.FullName)
+       paddingService(svc, currentService)
+       // diff
+       hasDiff, err := utils.HasDiff(svc, currentService)
+       // sync
+       if err != nil {
+               errNotify = err
+               return
+       }
+       if hasDiff {
+               if *svc.ID == strconv.Itoa(0) {
+                       op = Create
+                       // 1. sync apisix and get id
+                       if serviceResponse, err := apisix.AddService(svc); err 
!= nil {
+                               // todo log error
+                               glog.V(2).Info(err.Error())

Review comment:
       Should use `pkg/log`.

##########
File path: pkg/seven/state/solver.go
##########
@@ -15,54 +15,96 @@
 package state
 
 import (
+       "context"
+       "errors"
+       "sync"
+       "time"
+
        "github.com/api7/ingress-controller/pkg/seven/apisix"
        "github.com/api7/ingress-controller/pkg/seven/db"
-       v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+       "github.com/api7/ingress-controller/pkg/types/apisix/v1"
 )
 
 var UpstreamQueue chan UpstreamQueueObj
-var ServiceQueue chan ServiceQueueObj
 
 func init() {
        UpstreamQueue = make(chan UpstreamQueueObj, 500)
-       ServiceQueue = make(chan ServiceQueueObj, 500)
        go WatchUpstream()
-       go WatchService()
-}
-
-func WatchService() {
-       for {
-               sqo := <-ServiceQueue
-               // solver service
-               SolverService(sqo.Services, sqo.RouteWorkerGroup)
-       }
 }
 
 func WatchUpstream() {
        for {
                uqo := <-UpstreamQueue
-               SolverUpstream(uqo.Upstreams, uqo.ServiceWorkerGroup)
+               SolverUpstream(uqo.Upstreams, uqo.ServiceWorkerGroup, uqo.Wg, 
uqo.ErrorChan)
        }
 }
 
 // Solver
-func (s *ApisixCombination) Solver() (bool, error) {
-       // 1.route workers
-       rwg := NewRouteWorkers(s.Routes)
-       // 2.service workers
-       swg := NewServiceWorkers(s.Services, &rwg)
-       //sqo := &ServiceQueueObj{Services: s.Services, RouteWorkerGroup: rwg}
-       //sqo.AddQueue()
-       // 3.upstream workers
-       uqo := &UpstreamQueueObj{Upstreams: s.Upstreams, ServiceWorkerGroup: 
swg}
+func (s *ApisixCombination) Solver() (string, error) {
+       // define the result notify
+       timeout := 1500 * time.Second
+       resultChan := make(chan CRDStatus)
+       ctx := context.Background()
+       ctx, _ = context.WithTimeout(ctx, timeout)
+       go s.SyncWithGroup(ctx, "", resultChan)
+
+       // add timeout after 5s
+       return WaitWorkerGroup("", resultChan)
+}
+
+func waitTimeout(wg *sync.WaitGroup, timeout time.Duration, resultChan chan 
CRDStatus) {
+       c := make(chan struct{})
+       go func() {
+               defer close(c)
+               wg.Wait()
+       }()
+       select {
+       case <-c:
+               resultChan <- CRDStatus{Id: "", Status: "success", Err: nil}
+       case <-time.After(timeout):
+               resultChan <- CRDStatus{Id: "", Status: "failure", Err: 
errors.New("timeout")}
+       }
+}
+
+func (s *ApisixCombination) SyncWithGroup(ctx context.Context, id string, 
resultChan chan CRDStatus) {
+       var wg sync.WaitGroup
+       count := len(s.Routes) + len(s.Services) + len(s.Upstreams)
+       wg.Add(count)
+       // goroutine for sync route/service/upstream
+       // route
+       rwg := NewRouteWorkers(ctx, s.Routes, &wg, resultChan)
+       // service
+       swg := NewServiceWorkers(ctx, s.Services, &rwg, &wg, resultChan)
+       // upstream
+       uqo := &UpstreamQueueObj{Upstreams: s.Upstreams, ServiceWorkerGroup: 
swg, Wg: &wg, ErrorChan: resultChan}
        uqo.AddQueue()
-       return true, nil
+       // waitTimeout should be shorter than worker timeout
+       waitTimeout(&wg, 100*time.Second, resultChan)

Review comment:
       Already have `context`, why still a hard coded timeout value needed here?

##########
File path: test/e2e/go.mod
##########
@@ -6,12 +6,17 @@ require (
        github.com/gavv/httpexpect/v2 v2.1.0
        github.com/gorilla/websocket v1.4.2 // indirect
        github.com/gruntwork-io/terratest v0.31.2
+       github.com/gxthrj/apisix-ingress-types v0.1.3
+       github.com/gxthrj/apisix-types v0.1.3
+       github.com/gxthrj/seven v0.2.7

Review comment:
       Already imports it into this repo, why references to the old one?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to