This is an automated email from the ASF dual-hosted git repository.
tew pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/master by this push:
new b90d3ae0 optimize: enhance RoundRobinLoadBalance performance (#995)
b90d3ae0 is described below
commit b90d3ae0a444ba7aab6cf4e51cfbeb3eb3bcc077
Author: EVERFID <[email protected]>
AuthorDate: Fri Dec 19 11:33:59 2025 +0800
optimize: enhance RoundRobinLoadBalance performance (#995)
* optimize: enhance RoundRobinLoadBalance performance
* add test
* fix
* add configurable retry for round-robin selector
---------
Co-authored-by: Tew <[email protected]>
Co-authored-by: TewGuo <[email protected]>
---
.../loadbalance/round_robin_loadbalance.go | 167 +++++++++++++++++++--
.../loadbalance/round_robin_loadbalance_test.go | 100 ++++++++++++
2 files changed, 253 insertions(+), 14 deletions(-)
diff --git a/pkg/remoting/loadbalance/round_robin_loadbalance.go
b/pkg/remoting/loadbalance/round_robin_loadbalance.go
index 9cebc926..b6da07ab 100644
--- a/pkg/remoting/loadbalance/round_robin_loadbalance.go
+++ b/pkg/remoting/loadbalance/round_robin_loadbalance.go
@@ -28,29 +28,168 @@ import (
var sequence int32
+// RoundRobinConfig holds configuration for round-robin load balancing
+type RoundRobinConfig struct {
+ MaxRetries int
+}
+
+// DefaultRoundRobinConfig provides sensible defaults
+var DefaultRoundRobinConfig = RoundRobinConfig{
+ MaxRetries: 3, // Balanced default: enough attempts without excessive
overhead
+}
+
+type rrSnapshot struct {
+ sessions []getty.Session
+}
+
+type rrSelector struct {
+ sessions *sync.Map
+ snapshot atomic.Value // stores *rrSnapshot
+ mu sync.Mutex
+ maxRetries int
+}
+
+// RoundRobinLoadBalance selects a session using round-robin algorithm with
default config
func RoundRobinLoadBalance(sessions *sync.Map, s string) getty.Session {
- // collect sync.Map adderToSession
- // filter out closed session instance
- adderToSession := make(map[string]getty.Session, 0)
- // map has no sequence, we should sort it to make sure the sequence is
always the same
- adders := make([]string, 0)
- sessions.Range(func(key, value interface{}) bool {
+ return RoundRobinLoadBalanceWithConfig(sessions, s,
DefaultRoundRobinConfig)
+}
+
+func RoundRobinLoadBalanceWithConfig(sessions *sync.Map, s string, config
RoundRobinConfig) getty.Session {
+ maxRetries := config.MaxRetries
+
+ // Validate and normalize
+ // Note: 0 is valid (means no retry, directly rebuild on first failure)
+ if maxRetries < 0 {
+ maxRetries = DefaultRoundRobinConfig.MaxRetries
+ }
+ if maxRetries > 10 {
+ maxRetries = 10
+ }
+
+ selector := &rrSelector{
+ sessions: sessions,
+ maxRetries: maxRetries,
+ }
+ selector.snapshot.Store((*rrSnapshot)(nil))
+
+ seq := getPositiveSequence()
+ return selector.selectWithSeq(seq)
+}
+
+func (r *rrSelector) getValidSnapshot() *rrSnapshot {
+ v := r.snapshot.Load()
+ if v == nil {
+ return nil
+ }
+ snap := v.(*rrSnapshot)
+ if snap == nil || len(snap.sessions) == 0 {
+ return nil
+ }
+ return snap
+}
+
+func (r *rrSelector) selectWithSeq(seq int) getty.Session {
+ // if maxRetries is 0, skip retry loop entirely
+ if r.maxRetries == 0 {
+ snap := r.getValidSnapshot()
+ if snap != nil && len(snap.sessions) > 0 {
+ idx := seq % len(snap.sessions)
+ session := snap.sessions[idx]
+ if !session.IsClosed() {
+ return session
+ }
+ }
+ // first attempt failed, rebuild immediately
+ return r.rebuildWithSeq(seq)
+ }
+
+ for retry := 0; retry < r.maxRetries; retry++ {
+ snap := r.getValidSnapshot()
+ if snap != nil {
+ n := len(snap.sessions)
+ if n > 0 {
+ idx := (seq + retry) % n
+ session := snap.sessions[idx]
+ if !session.IsClosed() {
+ return session
+ }
+ }
+ }
+
+ if retry < r.maxRetries-1 && snap != nil {
+ continue
+ }
+
+ break
+ }
+
+ return r.rebuildWithSeq(seq)
+}
+
+func (r *rrSelector) rebuildWithSeq(seq int) getty.Session {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+
+ snap := r.getValidSnapshot()
+ if snap != nil {
+ n := len(snap.sessions)
+ if n > 0 {
+ // try to find an open session starting from the
calculated index
+ for i := 0; i < n; i++ {
+ idx := (seq + i) % n
+ session := snap.sessions[idx]
+ if !session.IsClosed() {
+ return session
+ }
+ }
+ }
+ }
+
+ addrToSession := make(map[string]getty.Session)
+ toDelete := make([]interface{}, 0)
+
+ r.sessions.Range(func(key, value interface{}) bool {
session := key.(getty.Session)
if session.IsClosed() {
- sessions.Delete(key)
+ toDelete = append(toDelete, key)
} else {
- adderToSession[session.RemoteAddr()] = session
- adders = append(adders, session.RemoteAddr())
+ addr := session.RemoteAddr()
+ addrToSession[addr] = session
}
return true
})
- sort.Strings(adders)
- // adderToSession eq 0 means there are no available session
- if len(adderToSession) == 0 {
+
+ // delete closed sessions synchronously
+ for _, k := range toDelete {
+ r.sessions.Delete(k)
+ }
+
+ if len(addrToSession) == 0 {
+ r.snapshot.Store((*rrSnapshot)(nil))
return nil
}
- index := getPositiveSequence() % len(adderToSession)
- return adderToSession[adders[index]]
+
+ // sort by address to ensure consistent order
+ addrs := make([]string, 0, len(addrToSession))
+ for addr := range addrToSession {
+ addrs = append(addrs, addr)
+ }
+ sort.Strings(addrs)
+
+ // build session list from sorted addresses
+ sessions := make([]getty.Session, len(addrs))
+ for i, addr := range addrs {
+ sessions[i] = addrToSession[addr]
+ }
+
+ // store new snapshot
+ newSnap := &rrSnapshot{sessions: sessions}
+ r.snapshot.Store(newSnap)
+
+ // select session using the same seq
+ n := len(sessions)
+ idx := seq % n
+ return sessions[idx]
}
func getPositiveSequence() int {
diff --git a/pkg/remoting/loadbalance/round_robin_loadbalance_test.go
b/pkg/remoting/loadbalance/round_robin_loadbalance_test.go
index b54f19f8..9281077d 100644
--- a/pkg/remoting/loadbalance/round_robin_loadbalance_test.go
+++ b/pkg/remoting/loadbalance/round_robin_loadbalance_test.go
@@ -23,6 +23,7 @@ import (
"sync"
"testing"
+ getty "github.com/apache/dubbo-getty"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
@@ -98,3 +99,102 @@ func TestRoundRobinLoadBalance_Empty(t *testing.T) {
t.Errorf("Expected nil, actual got %+v", result)
}
}
+
+func TestRoundRobinLoadBalance_ConcurrentAccess(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ sessions := &sync.Map{}
+
+ for i := 0; i < 5; i++ {
+ session := mock.NewMockTestSession(ctrl)
+ session.EXPECT().IsClosed().Return(false).AnyTimes()
+ session.EXPECT().RemoteAddr().Return(fmt.Sprintf("addr-%d",
i)).AnyTimes()
+ sessions.Store(session, fmt.Sprintf("session-%d", i))
+ }
+
+ var wg sync.WaitGroup
+ results := make([]getty.Session, 100)
+ for i := 0; i < 100; i++ {
+ wg.Add(1)
+ go func(idx int) {
+ defer wg.Done()
+ results[idx] = RoundRobinLoadBalance(sessions, "xid")
+ }(i)
+ }
+ wg.Wait()
+
+ for i, result := range results {
+ assert.NotNil(t, result, "Result at index %d should not be
nil", i)
+ assert.False(t, result.IsClosed())
+ }
+}
+
+func TestRoundRobinLoadBalance_SelectedSessionClosed(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ sessions := &sync.Map{}
+
+ callCount := 0
+ session1 := mock.NewMockTestSession(ctrl)
+ session1.EXPECT().IsClosed().DoAndReturn(func() bool {
+ callCount++
+ return callCount > 2
+ }).AnyTimes()
+ session1.EXPECT().RemoteAddr().Return("addr-0").AnyTimes()
+
+ session2 := mock.NewMockTestSession(ctrl)
+ session2.EXPECT().IsClosed().Return(false).AnyTimes()
+ session2.EXPECT().RemoteAddr().Return("addr-1").AnyTimes()
+
+ sessions.Store(session1, "session-1")
+ sessions.Store(session2, "session-2")
+
+ result1 := RoundRobinLoadBalance(sessions, "xid")
+ assert.NotNil(t, result1)
+
+ result2 := RoundRobinLoadBalance(sessions, "xid")
+ assert.NotNil(t, result2)
+
+ result3 := RoundRobinLoadBalance(sessions, "xid")
+ assert.NotNil(t, result3)
+}
+
+func TestRRSelector_GetValidSnapshot_Nil(t *testing.T) {
+ selector := &rrSelector{sessions: &sync.Map{}}
+ selector.snapshot.Store((*rrSnapshot)(nil))
+
+ snap := selector.getValidSnapshot()
+ assert.Nil(t, snap, "Should return nil for nil snapshot")
+}
+
+func TestRRSelector_GetValidSnapshot_EmptySessions(t *testing.T) {
+ selector := &rrSelector{sessions: &sync.Map{}}
+ selector.snapshot.Store(&rrSnapshot{sessions: []getty.Session{}})
+
+ snap := selector.getValidSnapshot()
+ assert.Nil(t, snap, "Should return nil for empty sessions")
+}
+
+func TestRRSelector_RebuildWithSeq_DeleteClosedSessions(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ sessions := &sync.Map{}
+
+ closedSession := mock.NewMockTestSession(ctrl)
+ closedSession.EXPECT().IsClosed().Return(true).AnyTimes()
+
+ openSession := mock.NewMockTestSession(ctrl)
+ openSession.EXPECT().IsClosed().Return(false).AnyTimes()
+ openSession.EXPECT().RemoteAddr().Return("addr-1").AnyTimes()
+
+ sessions.Store(closedSession, "closed")
+ sessions.Store(openSession, "open")
+
+ result := RoundRobinLoadBalance(sessions, "xid")
+
+ count := 0
+ sessions.Range(func(key, value interface{}) bool {
+ count++
+ return true
+ })
+ assert.Equal(t, 1, count, "Should only have 1 session left")
+ assert.NotNil(t, result)
+ assert.Equal(t, "addr-1", result.RemoteAddr())
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]