gxthrj commented on a change in pull request #149:
URL: 
https://github.com/apache/apisix-ingress-controller/pull/149#discussion_r551328438



##########
File path: pkg/seven/state/solver.go
##########
@@ -15,54 +15,99 @@
 package state
 
 import (
+       "context"
+       "errors"
        "github.com/api7/ingress-controller/pkg/seven/apisix"
        "github.com/api7/ingress-controller/pkg/seven/db"
-       v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+       "github.com/api7/ingress-controller/pkg/types/apisix/v1"
+       "sync"
+       "time"
 )
 
 var UpstreamQueue chan UpstreamQueueObj
-var ServiceQueue chan ServiceQueueObj
 
 func init() {
        UpstreamQueue = make(chan UpstreamQueueObj, 500)
-       ServiceQueue = make(chan ServiceQueueObj, 500)
        go WatchUpstream()
-       go WatchService()
-}
-
-func WatchService() {
-       for {
-               sqo := <-ServiceQueue
-               // solver service
-               SolverService(sqo.Services, sqo.RouteWorkerGroup)
-       }
 }
 
 func WatchUpstream() {
        for {
                uqo := <-UpstreamQueue
-               SolverUpstream(uqo.Upstreams, uqo.ServiceWorkerGroup)
+               SolverUpstream(uqo.Upstreams, uqo.ServiceWorkerGroup, uqo.Wg, 
uqo.ErrorChan)
        }
 }
 
 // Solver
-func (s *ApisixCombination) Solver() (bool, error) {
-       // 1.route workers
-       rwg := NewRouteWorkers(s.Routes)
-       // 2.service workers
-       swg := NewServiceWorkers(s.Services, &rwg)
-       //sqo := &ServiceQueueObj{Services: s.Services, RouteWorkerGroup: rwg}
-       //sqo.AddQueue()
-       // 3.upstream workers
-       uqo := &UpstreamQueueObj{Upstreams: s.Upstreams, ServiceWorkerGroup: 
swg}
+func (s *ApisixCombination) Solver() (string, error) {
+       // define the result notify
+       timeout := 15 * time.Second
+       resultChan := make(chan CRDStatus)
+       ctx := context.Background()
+       ctx, _ = context.WithTimeout(ctx, timeout)
+       go s.SyncWithGroup(ctx, "", resultChan)
+
+       // add timeout after 5s
+       return WaitWorkerGroup("", resultChan)
+}
+
+func waitTimeout(wg *sync.WaitGroup, timeout time.Duration, resultChan chan 
CRDStatus) {
+       c := make(chan struct{})
+       go func() {
+               defer close(c)
+               wg.Wait()
+       }()
+       select {
+       case <-c:
+               resultChan <- CRDStatus{Id: "", Status: "success", Err: nil}
+       case <-time.After(timeout):
+               resultChan <- CRDStatus{Id: "", Status: "failure", Err: 
errors.New("timeout")}
+       }
+}
+
+func (s *ApisixCombination) SyncWithGroup(ctx context.Context, id string, 
resultChan chan CRDStatus) {
+       var wg sync.WaitGroup
+       count := len(s.Routes) + len(s.Services) + len(s.Upstreams)
+       wg.Add(count)
+       // goroutine for sync route/service/upstream
+       // route
+       rwg := NewRouteWorkers(ctx, s.Routes, &wg, resultChan)
+       // service
+       swg := NewServiceWorkers(ctx, s.Services, &rwg, &wg, resultChan)
+       // upstream
+       uqo := &UpstreamQueueObj{Upstreams: s.Upstreams, ServiceWorkerGroup: 
swg, Wg: &wg, ErrorChan: resultChan}
        uqo.AddQueue()
-       return true, nil
+       // waitTimeout should be shorter than worker timeout
+       waitTimeout(&wg, 10*time.Second, resultChan)
+}
+
+func WaitWorkerGroup(id string, resultChan chan CRDStatus) (string, error) {
+       for {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to