gxthrj commented on a change in pull request #149:
URL: 
https://github.com/apache/apisix-ingress-controller/pull/149#discussion_r551768201



##########
File path: pkg/seven/state/solver.go
##########
@@ -15,54 +15,96 @@
 package state
 
 import (
+       "context"
+       "errors"
+       "sync"
+       "time"
+
        "github.com/api7/ingress-controller/pkg/seven/apisix"
        "github.com/api7/ingress-controller/pkg/seven/db"
-       v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+       "github.com/api7/ingress-controller/pkg/types/apisix/v1"
 )
 
 var UpstreamQueue chan UpstreamQueueObj
-var ServiceQueue chan ServiceQueueObj
 
 func init() {
        UpstreamQueue = make(chan UpstreamQueueObj, 500)
-       ServiceQueue = make(chan ServiceQueueObj, 500)
        go WatchUpstream()
-       go WatchService()
-}
-
-func WatchService() {
-       for {
-               sqo := <-ServiceQueue
-               // solver service
-               SolverService(sqo.Services, sqo.RouteWorkerGroup)
-       }
 }
 
 func WatchUpstream() {
        for {
                uqo := <-UpstreamQueue
-               SolverUpstream(uqo.Upstreams, uqo.ServiceWorkerGroup)
+               SolverUpstream(uqo.Upstreams, uqo.ServiceWorkerGroup, uqo.Wg, 
uqo.ErrorChan)
        }
 }
 
 // Solver
-func (s *ApisixCombination) Solver() (bool, error) {
-       // 1.route workers
-       rwg := NewRouteWorkers(s.Routes)
-       // 2.service workers
-       swg := NewServiceWorkers(s.Services, &rwg)
-       //sqo := &ServiceQueueObj{Services: s.Services, RouteWorkerGroup: rwg}
-       //sqo.AddQueue()
-       // 3.upstream workers
-       uqo := &UpstreamQueueObj{Upstreams: s.Upstreams, ServiceWorkerGroup: 
swg}
+func (s *ApisixCombination) Solver() (string, error) {
+       // define the result notify
+       timeout := 1500 * time.Second
+       resultChan := make(chan CRDStatus)
+       ctx := context.Background()
+       ctx, _ = context.WithTimeout(ctx, timeout)
+       go s.SyncWithGroup(ctx, "", resultChan)
+
+       // add timeout after 5s
+       return WaitWorkerGroup("", resultChan)
+}
+
+func waitTimeout(wg *sync.WaitGroup, timeout time.Duration, resultChan chan 
CRDStatus) {
+       c := make(chan struct{})
+       go func() {
+               defer close(c)
+               wg.Wait()
+       }()
+       select {
+       case <-c:
+               resultChan <- CRDStatus{Id: "", Status: "success", Err: nil}
+       case <-time.After(timeout):
+               resultChan <- CRDStatus{Id: "", Status: "failure", Err: 
errors.New("timeout")}
+       }
+}
+
+func (s *ApisixCombination) SyncWithGroup(ctx context.Context, id string, 
resultChan chan CRDStatus) {
+       var wg sync.WaitGroup
+       count := len(s.Routes) + len(s.Services) + len(s.Upstreams)
+       wg.Add(count)
+       // goroutine for sync route/service/upstream
+       // route
+       rwg := NewRouteWorkers(ctx, s.Routes, &wg, resultChan)
+       // service
+       swg := NewServiceWorkers(ctx, s.Services, &rwg, &wg, resultChan)
+       // upstream
+       uqo := &UpstreamQueueObj{Upstreams: s.Upstreams, ServiceWorkerGroup: 
swg, Wg: &wg, ErrorChan: resultChan}
        uqo.AddQueue()
-       return true, nil
+       // waitTimeout should be shorter than worker timeout
+       waitTimeout(&wg, 100*time.Second, resultChan)

Review comment:
       100 is wrong, I set 10s here for WaitGroup timeout. less than 15s  
timetout for workers.
   It seems not neccessary to do this. I have changed to use ctx.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to