1kasa commented on PR #3055:
URL: https://github.com/apache/dubbo-go/pull/3055#issuecomment-3421169427

   I did some testing and verification,The following codes are executed 
respectively :go run -race XXXX.go
   old:
   ```
   package main
   
   import (
        "fmt"
        "sync"
        "sync/atomic"
        "time"
   )
   
   var (
        methodWeightMap sync.Map
        state           int32
        recyclePeriod   = int64(time.Second)
   )
   
   type weightedRoundRobin struct {
        weight     int64
        current    int64
        lastUpdate *time.Time
   }
   
   type cachedInvokers struct {
        sync.Map
   }
   
   func main() {
        key := "testMethod"
        cached, _ := methodWeightMap.LoadOrStore(key, &cachedInvokers{})
        c := cached.(*cachedInvokers)
        
        for i := 0; i < 50; i++ {
                go func(id int) {
                        for {
                                Select(c)
                                time.Sleep(10 * time.Microsecond)
                        }
                }(i)
        }
        
        go func() {
                for {
                        cleanIfRequired(true, c, time.Now())
                        time.Sleep(50 * time.Microsecond)
                }
        }()
   
        time.Sleep(5 * time.Second)
        fmt.Println("done (if you didn’t panic, try rerun several times)")
   }
   
   func Select(c *cachedInvokers) {
        now := time.Now()
        identifier := "node1"
        
        wr := &weightedRoundRobin{weight: 10}
        loaded, found := c.LoadOrStore(identifier, wr)
        weightRobin := loaded.(*weightedRoundRobin)
        if !found {
                time.Sleep(time.Microsecond * 10)
        }
        weightRobin.lastUpdate = &now
   }
   
   func cleanIfRequired(clean bool, invokers *cachedInvokers, now time.Time) {
        if clean && atomic.CompareAndSwapInt32(&state, 0, 1) {
                defer atomic.CompareAndSwapInt32(&state, 1, 0)
                invokers.Range(func(key, value any) bool {
                        wr := value.(*weightedRoundRobin)
                        if wr.lastUpdate != nil {
                                elapsed := now.Sub(*wr.lastUpdate).Nanoseconds()
                                if elapsed > recyclePeriod {
                                        invokers.Delete(key)
                                }
                        } else {
                                fmt.Println(" lastUpdate == nil, race condition 
detected")
                        }
                        return true
                })
        }
   }
   
   ```
   result:
   <img width="1081" height="683" alt="image" 
src="https://github.com/user-attachments/assets/f2b8d920-9c6a-4e4c-a22f-170694b943a6";
 />
   
   fixed:
   ```
   package main
   
   import (
        "fmt"
        "math/rand"
        "sync"
        "sync/atomic"
        "time"
   )
   
   var (
        methodWeightMap sync.Map
        state           = int32(0)
        recyclePeriod   = int64(60 * time.Second)
   )
   
   // Weighted round robin object
   type weightedRoundRobin struct {
        weight     int64
        current    int64
        lastUpdate atomic.Pointer[time.Time]
   }
   
   func (r *weightedRoundRobin) Weight() int64 {
        return atomic.LoadInt64(&r.weight)
   }
   
   func (r *weightedRoundRobin) setWeight(w int64) {
        atomic.StoreInt64(&r.weight, w)
        atomic.StoreInt64(&r.current, 0)
   }
   
   func (r *weightedRoundRobin) increaseCurrent() int64 {
        return atomic.AddInt64(&r.current, r.Weight())
   }
   
   func (r *weightedRoundRobin) Current(delta int64) {
        atomic.AddInt64(&r.current, -1*delta)
   }
   
   func (r *weightedRoundRobin) LastUpdate() *time.Time {
        return r.lastUpdate.Load()
   }
   
   func (r *weightedRoundRobin) setLastUpdate(t *time.Time) {
        r.lastUpdate.Store(t)
   }
   
   // The main load balancer select logic
   func Select(key string, invokers int) {
        cache, _ := methodWeightMap.LoadOrStore(key, &sync.Map{})
        cachedInvokers := cache.(*sync.Map)
   
        now := time.Now()
        totalWeight := int64(0)
        maxCurrentWeight := int64(-1 << 63)
        var selected *weightedRoundRobin
   
        clean := false
   
        for i := 0; i < invokers; i++ {
                identifier := fmt.Sprintf("invoker-%d", i)
                weight := rand.Int63n(100) + 1
   
                wr := &weightedRoundRobin{}
                wr.setWeight(weight)
                wr.setLastUpdate(&now)
   
                loaded, found := cachedInvokers.LoadOrStore(identifier, wr)
                weightRobin := loaded.(*weightedRoundRobin)
   
                if found {
                        weightRobin.setLastUpdate(&now)
                } else {
                        clean = true
                }
   
                if weightRobin.Weight() != weight {
                        weightRobin.setWeight(weight)
                }
   
                currentWeight := weightRobin.increaseCurrent()
                if currentWeight > maxCurrentWeight {
                        maxCurrentWeight = currentWeight
                        selected = weightRobin
                }
                totalWeight += weight
        }
   
        cleanIfRequired(clean, cachedInvokers, &now)
   
        if selected != nil {
                selected.Current(totalWeight)
        }
   }
   
   // Cleaning function
   func cleanIfRequired(clean bool, invokers *sync.Map, now *time.Time) {
        if clean && atomic.CompareAndSwapInt32(&state, 0, 1) {
                defer atomic.CompareAndSwapInt32(&state, 1, 0)
   
                invokers.Range(func(key, value any) bool {
                        robin := value.(*weightedRoundRobin)
                        last := robin.LastUpdate()
                        if last == nil {
                                return true
                        }
                        elapsed := now.Sub(*last).Nanoseconds()
                        if elapsed > recyclePeriod {
                                invokers.Delete(key)
                        }
                        return true
                })
        }
   }
   
   func main() {
        rand.Seed(time.Now().UnixNano())
   
        key := "service.method"
   
        for i := 0; i < 50; i++ {
                go func() {
                        for j := 0; j < 2000; j++ {
                                Select(key, 5)
                                time.Sleep(time.Microsecond * 100)
                        }
                }()
        }
   
        for i := 0; i < 3; i++ {
                go func() {
                        for j := 0; j < 1000; j++ {
                                cache, ok := methodWeightMap.Load(key)
                                if ok {
                                        t := time.Now()
                                        cleanIfRequired(true, 
cache.(*sync.Map), &t)
                                }
                                time.Sleep(time.Millisecond * 5)
                        }
                }()
        }
   
        time.Sleep(3 * time.Second)
        fmt.Println("done (run with -race to verify no data races)")
   }
   
   ```
   result: done (run with -race to verify no data races)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to