1kasa commented on PR #3055:
URL: https://github.com/apache/dubbo-go/pull/3055#issuecomment-3421169427
I did some testing and verification,The following codes are executed
respectively :go run -race XXXX.go
old:
```
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var (
methodWeightMap sync.Map
state int32
recyclePeriod = int64(time.Second)
)
type weightedRoundRobin struct {
weight int64
current int64
lastUpdate *time.Time
}
type cachedInvokers struct {
sync.Map
}
func main() {
key := "testMethod"
cached, _ := methodWeightMap.LoadOrStore(key, &cachedInvokers{})
c := cached.(*cachedInvokers)
for i := 0; i < 50; i++ {
go func(id int) {
for {
Select(c)
time.Sleep(10 * time.Microsecond)
}
}(i)
}
go func() {
for {
cleanIfRequired(true, c, time.Now())
time.Sleep(50 * time.Microsecond)
}
}()
time.Sleep(5 * time.Second)
fmt.Println("done (if you didn’t panic, try rerun several times)")
}
func Select(c *cachedInvokers) {
now := time.Now()
identifier := "node1"
wr := &weightedRoundRobin{weight: 10}
loaded, found := c.LoadOrStore(identifier, wr)
weightRobin := loaded.(*weightedRoundRobin)
if !found {
time.Sleep(time.Microsecond * 10)
}
weightRobin.lastUpdate = &now
}
func cleanIfRequired(clean bool, invokers *cachedInvokers, now time.Time) {
if clean && atomic.CompareAndSwapInt32(&state, 0, 1) {
defer atomic.CompareAndSwapInt32(&state, 1, 0)
invokers.Range(func(key, value any) bool {
wr := value.(*weightedRoundRobin)
if wr.lastUpdate != nil {
elapsed := now.Sub(*wr.lastUpdate).Nanoseconds()
if elapsed > recyclePeriod {
invokers.Delete(key)
}
} else {
fmt.Println(" lastUpdate == nil, race condition
detected")
}
return true
})
}
}
```
result:
<img width="1081" height="683" alt="image"
src="https://github.com/user-attachments/assets/f2b8d920-9c6a-4e4c-a22f-170694b943a6"
/>
fixed:
```
package main
import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
)
var (
methodWeightMap sync.Map
state = int32(0)
recyclePeriod = int64(60 * time.Second)
)
// Weighted round robin object
type weightedRoundRobin struct {
weight int64
current int64
lastUpdate atomic.Pointer[time.Time]
}
func (r *weightedRoundRobin) Weight() int64 {
return atomic.LoadInt64(&r.weight)
}
func (r *weightedRoundRobin) setWeight(w int64) {
atomic.StoreInt64(&r.weight, w)
atomic.StoreInt64(&r.current, 0)
}
func (r *weightedRoundRobin) increaseCurrent() int64 {
return atomic.AddInt64(&r.current, r.Weight())
}
func (r *weightedRoundRobin) Current(delta int64) {
atomic.AddInt64(&r.current, -1*delta)
}
func (r *weightedRoundRobin) LastUpdate() *time.Time {
return r.lastUpdate.Load()
}
func (r *weightedRoundRobin) setLastUpdate(t *time.Time) {
r.lastUpdate.Store(t)
}
// The main load balancer select logic
func Select(key string, invokers int) {
cache, _ := methodWeightMap.LoadOrStore(key, &sync.Map{})
cachedInvokers := cache.(*sync.Map)
now := time.Now()
totalWeight := int64(0)
maxCurrentWeight := int64(-1 << 63)
var selected *weightedRoundRobin
clean := false
for i := 0; i < invokers; i++ {
identifier := fmt.Sprintf("invoker-%d", i)
weight := rand.Int63n(100) + 1
wr := &weightedRoundRobin{}
wr.setWeight(weight)
wr.setLastUpdate(&now)
loaded, found := cachedInvokers.LoadOrStore(identifier, wr)
weightRobin := loaded.(*weightedRoundRobin)
if found {
weightRobin.setLastUpdate(&now)
} else {
clean = true
}
if weightRobin.Weight() != weight {
weightRobin.setWeight(weight)
}
currentWeight := weightRobin.increaseCurrent()
if currentWeight > maxCurrentWeight {
maxCurrentWeight = currentWeight
selected = weightRobin
}
totalWeight += weight
}
cleanIfRequired(clean, cachedInvokers, &now)
if selected != nil {
selected.Current(totalWeight)
}
}
// Cleaning function
func cleanIfRequired(clean bool, invokers *sync.Map, now *time.Time) {
if clean && atomic.CompareAndSwapInt32(&state, 0, 1) {
defer atomic.CompareAndSwapInt32(&state, 1, 0)
invokers.Range(func(key, value any) bool {
robin := value.(*weightedRoundRobin)
last := robin.LastUpdate()
if last == nil {
return true
}
elapsed := now.Sub(*last).Nanoseconds()
if elapsed > recyclePeriod {
invokers.Delete(key)
}
return true
})
}
}
func main() {
rand.Seed(time.Now().UnixNano())
key := "service.method"
for i := 0; i < 50; i++ {
go func() {
for j := 0; j < 2000; j++ {
Select(key, 5)
time.Sleep(time.Microsecond * 100)
}
}()
}
for i := 0; i < 3; i++ {
go func() {
for j := 0; j < 1000; j++ {
cache, ok := methodWeightMap.Load(key)
if ok {
t := time.Now()
cleanIfRequired(true,
cache.(*sync.Map), &t)
}
time.Sleep(time.Millisecond * 5)
}
}()
}
time.Sleep(3 * time.Second)
fmt.Println("done (run with -race to verify no data races)")
}
```
result: done (run with -race to verify no data races)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]