gxthrj commented on a change in pull request #149:
URL:
https://github.com/apache/apisix-ingress-controller/pull/149#discussion_r551312769
##########
File path: pkg/seven/state/builder.go
##########
@@ -84,161 +86,198 @@ func paddingUpstream(upstream *v1.Upstream,
currentUpstream *v1.Upstream) {
// NewRouteWorkers make routeWrokers group by service per CRD
// 1.make routes group by (1_2_3) it may be a map like map[1_2_3][]Route;
// 2.route is listenning Event from the ready of 1_2_3;
-func NewRouteWorkers(routes []*v1.Route) RouteWorkerGroup {
+func NewRouteWorkers(ctx context.Context, routes []*v1.Route, wg
*sync.WaitGroup, errorChan chan CRDStatus) RouteWorkerGroup {
rwg := make(RouteWorkerGroup)
for _, r := range routes {
- quit := make(chan Quit)
- rw := &routeWorker{Route: r, Quit: quit}
+ rw := &routeWorker{Route: r, Ctx: ctx, Wg: wg, ErrorChan:
errorChan}
rw.start()
rwg.Add(*r.ServiceName, rw)
}
return rwg
}
// 3.route get the Event and trigger a padding for object,then diff,sync;
-func (r *routeWorker) trigger(event Event) error {
- defer close(r.Quit)
+func (r *routeWorker) trigger(event Event) {
+ var errNotify error
+ defer func(err error) {
+ if err != nil {
+ r.ErrorChan <- CRDStatus{Id: "", Status: "failure",
Err: err}
+ }
+ r.Wg.Done()
+ }(errNotify)
// consumer Event
service := event.Obj.(*v1.Service)
r.ServiceId = service.ID
glog.V(2).Infof("trigger routeWorker %s from %s, %s", *r.Name,
event.Op, *service.Name)
// padding
- currentRoute, _ := apisix.FindCurrentRoute(r.Route)
+ currentRoute, err := apisix.FindCurrentRoute(r.Route)
+ if err != nil && err.Error() != "NOT FOUND" {
+ errNotify = err
+ return
+ }
paddingRoute(r.Route, currentRoute)
// diff
hasDiff, err := utils.HasDiff(r.Route, currentRoute)
// sync
if err != nil {
- return err
+ errNotify = err
+ return
}
if hasDiff {
- r.sync()
+ err := r.sync()
+ if err != nil {
+ errNotify = err
+ return
+ }
}
- // todo broadcast
-
- return nil
}
// sync
-func (r *routeWorker) sync() {
+func (r *routeWorker) sync() error {
if *r.Route.ID != strconv.Itoa(0) {
// 1. sync memDB
db := &db.RouteDB{Routes: []*v1.Route{r.Route}}
if err := db.UpdateRoute(); err != nil {
glog.Errorf("update route failed, route: %#v, err:
%+v", r.Route, err)
- return
+ return err
}
// 2. sync apisix
- apisix.UpdateRoute(r.Route)
+ if err := apisix.UpdateRoute(r.Route); err != nil {
+ return err
+ }
glog.V(2).Infof("update route %s, %s", *r.Name, *r.ServiceId)
} else {
// 1. sync apisix and get id
if res, err := apisix.AddRoute(r.Route); err != nil {
glog.Errorf("add route failed, route: %#v, err: %+v",
r.Route, err)
- return
+ return err
} else {
key := res.Route.Key
tmp := strings.Split(*key, "/")
*r.ID = tmp[len(tmp)-1]
}
// 2. sync memDB
db := &db.RouteDB{Routes: []*v1.Route{r.Route}}
- db.Insert()
+ if err := db.Insert(); err != nil {
+ return err
+ }
glog.V(2).Infof("create route %s, %s", *r.Name, *r.ServiceId)
}
+ return nil
}
// service
-func NewServiceWorkers(services []*v1.Service, rwg *RouteWorkerGroup)
ServiceWorkerGroup {
+func NewServiceWorkers(ctx context.Context, services []*v1.Service, rwg
*RouteWorkerGroup, wg *sync.WaitGroup, errorChan chan CRDStatus)
ServiceWorkerGroup {
swg := make(ServiceWorkerGroup)
for _, s := range services {
- quit := make(chan Quit)
- rw := &serviceWorker{Service: s, Quit: quit}
+ rw := &serviceWorker{Service: s, Ctx: ctx, Wg: wg, ErrorChan:
errorChan}
+ //rw.Wg.Add(1)
rw.start(rwg)
swg.Add(*s.UpstreamName, rw)
}
return swg
}
// upstream
-func SolverUpstream(upstreams []*v1.Upstream, swg ServiceWorkerGroup) {
+func SolverUpstream(upstreams []*v1.Upstream, swg ServiceWorkerGroup, wg
*sync.WaitGroup, errorChan chan CRDStatus) {
for _, u := range upstreams {
- op := Update
- if currentUpstream, err := apisix.FindCurrentUpstream(*u.Group,
*u.Name, *u.FullName); err != nil {
- glog.Errorf("solver upstream failed, find upstream from
etcd failed, upstream: %+v, err: %+v", u, err)
+ go SolverSingleUpstream(u, swg, wg, errorChan)
+ }
+}
+
+func SolverSingleUpstream(u *v1.Upstream, swg ServiceWorkerGroup, wg
*sync.WaitGroup, errorChan chan CRDStatus) {
+ var errNotify error
+ defer func(err error) {
+ if err != nil {
+ errorChan <- CRDStatus{Id: "", Status: "failure", Err:
err}
+ }
+ wg.Done()
+ }(errNotify)
+ op := Update
+ if currentUpstream, err := apisix.FindCurrentUpstream(*u.Group,
*u.Name, *u.FullName); err != nil {
+ glog.Errorf("solver upstream failed, find upstream from etcd
failed, upstream: %+v, err: %+v", u, err)
+ errNotify = err
+ return
+ } else {
+ paddingUpstream(u, currentUpstream)
+ // diff
+ hasDiff, err := utils.HasDiff(u, currentUpstream)
+ if err != nil {
+ errNotify = err
return
- } else {
- paddingUpstream(u, currentUpstream)
- // diff
- hasDiff, _ := utils.HasDiff(u, currentUpstream)
- if hasDiff {
- if *u.ID != strconv.Itoa(0) {
- op = Update
- // 0.field check
- needToUpdate := true
- if currentUpstream.FromKind != nil &&
*(currentUpstream.FromKind) == ApisixUpstream { // update from ApisixUpstream
- if u.FromKind == nil ||
(u.FromKind != nil && *(u.FromKind) != ApisixUpstream) {
- // currentUpstream > u
- // set lb && health
check
- needToUpdate = false
- }
- }
- if needToUpdate {
- // 1.sync memDB
- upstreamDB :=
&db.UpstreamDB{Upstreams: []*v1.Upstream{u}}
- if err :=
upstreamDB.UpdateUpstreams(); err != nil {
- glog.Errorf("solver
upstream failed, update upstream to local db failed, err: %s", err.Error())
- return
- }
- // 2.sync apisix
- if err =
apisix.UpdateUpstream(u); err != nil {
- glog.Errorf("solver
upstream failed, update upstream to etcd failed, err: %+v", err)
- return
- }
+ }
+ if hasDiff {
+ if *u.ID != strconv.Itoa(0) {
+ op = Update
+ // 0.field check
+ needToUpdate := true
+ if currentUpstream.FromKind != nil &&
*(currentUpstream.FromKind) == ApisixUpstream { // update from ApisixUpstream
+ if u.FromKind == nil || (u.FromKind !=
nil && *(u.FromKind) != ApisixUpstream) {
+ // currentUpstream > u
+ // set lb && health check
+ needToUpdate = false
}
- // if fromKind == WatchFromKind
- if u.FromKind != nil && *u.FromKind ==
WatchFromKind {
- // 1.update nodes
- if err = apisix.PatchNodes(u,
u.Nodes); err != nil {
- glog.Errorf("solver
upstream failed, patch node info to etcd failed, err: %+v", err)
- return
- }
- // 2. sync memDB
- us := []*v1.Upstream{u}
- if !needToUpdate {
- currentUpstream.Nodes =
u.Nodes
- us =
[]*v1.Upstream{currentUpstream}
- }
- upstreamDB :=
&db.UpstreamDB{Upstreams: us}
- if err :=
upstreamDB.UpdateUpstreams(); err != nil {
- glog.Errorf("solver
upstream failed, update upstream to local db failed, err: %s", err.Error())
- return
- }
+ }
+ if needToUpdate {
+ // 1.sync memDB
+ upstreamDB := &db.UpstreamDB{Upstreams:
[]*v1.Upstream{u}}
+ if err := upstreamDB.UpdateUpstreams();
err != nil {
+ glog.Errorf("solver upstream
failed, update upstream to local db failed, err: %s", err.Error())
+ errNotify = err
+ return
}
- } else {
- op = Create
- // 1.sync apisix and get response
- if upstreamResponse, err :=
apisix.AddUpstream(u); err != nil {
+ // 2.sync apisix
+ if err = apisix.UpdateUpstream(u); err
!= nil {
glog.Errorf("solver upstream
failed, update upstream to etcd failed, err: %+v", err)
+ errNotify = err
return
- } else {
- tmp :=
strings.Split(*upstreamResponse.Upstream.Key, "/")
- *u.ID = tmp[len(tmp)-1]
}
- // 2.sync memDB
-
//apisix.InsertUpstreams([]*v1.Upstream{u})
- upstreamDB := &db.UpstreamDB{Upstreams:
[]*v1.Upstream{u}}
- upstreamDB.InsertUpstreams()
}
+ // if fromKind == WatchFromKind
+ if u.FromKind != nil && *u.FromKind ==
WatchFromKind {
+ // 1.update nodes
+ if err = apisix.PatchNodes(u, u.Nodes);
err != nil {
+ glog.Errorf("solver upstream
failed, patch node info to etcd failed, err: %+v", err)
+ errNotify = err
+ return
+ }
+ // 2. sync memDB
+ us := []*v1.Upstream{u}
+ if !needToUpdate {
+ currentUpstream.Nodes = u.Nodes
+ us =
[]*v1.Upstream{currentUpstream}
+ }
+ upstreamDB := &db.UpstreamDB{Upstreams:
us}
+ if err := upstreamDB.UpdateUpstreams();
err != nil {
+ glog.Errorf("solver upstream
failed, update upstream to local db failed, err: %s", err.Error())
+ errNotify = err
+ return
+ }
+ }
+ } else {
+ op = Create
+ // 1.sync apisix and get response
+ if upstreamResponse, err :=
apisix.AddUpstream(u); err != nil {
+ glog.Errorf("solver upstream failed,
update upstream to etcd failed, err: %+v", err)
+ errNotify = err
+ return
+ } else {
+ tmp :=
strings.Split(*upstreamResponse.Upstream.Key, "/")
+ *u.ID = tmp[len(tmp)-1]
+ }
+ // 2.sync memDB
+ //apisix.InsertUpstreams([]*v1.Upstream{u})
+ upstreamDB := &db.UpstreamDB{Upstreams:
[]*v1.Upstream{u}}
+ upstreamDB.InsertUpstreams()
}
}
- glog.V(2).Infof("solver upstream %s:%s", op, *u.Name)
- // anyway, broadcast to service
- serviceWorkers := swg[*u.Name]
- for _, sw := range serviceWorkers {
- event := &Event{Kind: UpstreamKind, Op: op, Obj: u}
- sw.Event <- *event
- }
+ }
+ glog.V(2).Infof("solver upstream %s:%s", op, *u.Name)
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]