This is an automated email from the ASF dual-hosted git repository.

tew pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b90d3ae0 optimize: enhance RoundRobinLoadBalance performance (#995)
b90d3ae0 is described below

commit b90d3ae0a444ba7aab6cf4e51cfbeb3eb3bcc077
Author: EVERFID <[email protected]>
AuthorDate: Fri Dec 19 11:33:59 2025 +0800

    optimize: enhance RoundRobinLoadBalance performance (#995)
    
    * optimize: enhance RoundRobinLoadBalance performance
    
    * add test
    
    * fix
    
    * add configurable retry for round-robin selector
    
    ---------
    
    Co-authored-by: Tew <[email protected]>
    Co-authored-by: TewGuo <[email protected]>
---
 .../loadbalance/round_robin_loadbalance.go         | 167 +++++++++++++++++++--
 .../loadbalance/round_robin_loadbalance_test.go    | 100 ++++++++++++
 2 files changed, 253 insertions(+), 14 deletions(-)

diff --git a/pkg/remoting/loadbalance/round_robin_loadbalance.go 
b/pkg/remoting/loadbalance/round_robin_loadbalance.go
index 9cebc926..b6da07ab 100644
--- a/pkg/remoting/loadbalance/round_robin_loadbalance.go
+++ b/pkg/remoting/loadbalance/round_robin_loadbalance.go
@@ -28,29 +28,168 @@ import (
 
 var sequence int32
 
+// RoundRobinConfig holds configuration for round-robin load balancing
+type RoundRobinConfig struct {
+       MaxRetries int
+}
+
+// DefaultRoundRobinConfig provides sensible defaults
+var DefaultRoundRobinConfig = RoundRobinConfig{
+       MaxRetries: 3, // Balanced default: enough attempts without excessive 
overhead
+}
+
+type rrSnapshot struct {
+       sessions []getty.Session
+}
+
+type rrSelector struct {
+       sessions   *sync.Map
+       snapshot   atomic.Value // stores *rrSnapshot
+       mu         sync.Mutex
+       maxRetries int
+}
+
+// RoundRobinLoadBalance selects a session using round-robin algorithm with 
default config
 func RoundRobinLoadBalance(sessions *sync.Map, s string) getty.Session {
-       // collect sync.Map adderToSession
-       // filter out closed session instance
-       adderToSession := make(map[string]getty.Session, 0)
-       // map has no sequence, we should sort it to make sure the sequence is 
always the same
-       adders := make([]string, 0)
-       sessions.Range(func(key, value interface{}) bool {
+       return RoundRobinLoadBalanceWithConfig(sessions, s, 
DefaultRoundRobinConfig)
+}
+
+func RoundRobinLoadBalanceWithConfig(sessions *sync.Map, s string, config 
RoundRobinConfig) getty.Session {
+       maxRetries := config.MaxRetries
+
+       // Validate and normalize
+       // Note: 0 is valid (means no retry, directly rebuild on first failure)
+       if maxRetries < 0 {
+               maxRetries = DefaultRoundRobinConfig.MaxRetries
+       }
+       if maxRetries > 10 {
+               maxRetries = 10
+       }
+
+       selector := &rrSelector{
+               sessions:   sessions,
+               maxRetries: maxRetries,
+       }
+       selector.snapshot.Store((*rrSnapshot)(nil))
+
+       seq := getPositiveSequence()
+       return selector.selectWithSeq(seq)
+}
+
+func (r *rrSelector) getValidSnapshot() *rrSnapshot {
+       v := r.snapshot.Load()
+       if v == nil {
+               return nil
+       }
+       snap := v.(*rrSnapshot)
+       if snap == nil || len(snap.sessions) == 0 {
+               return nil
+       }
+       return snap
+}
+
+func (r *rrSelector) selectWithSeq(seq int) getty.Session {
+       // if maxRetries is 0, skip retry loop entirely
+       if r.maxRetries == 0 {
+               snap := r.getValidSnapshot()
+               if snap != nil && len(snap.sessions) > 0 {
+                       idx := seq % len(snap.sessions)
+                       session := snap.sessions[idx]
+                       if !session.IsClosed() {
+                               return session
+                       }
+               }
+               // first attempt failed, rebuild immediately
+               return r.rebuildWithSeq(seq)
+       }
+
+       for retry := 0; retry < r.maxRetries; retry++ {
+               snap := r.getValidSnapshot()
+               if snap != nil {
+                       n := len(snap.sessions)
+                       if n > 0 {
+                               idx := (seq + retry) % n
+                               session := snap.sessions[idx]
+                               if !session.IsClosed() {
+                                       return session
+                               }
+                       }
+               }
+
+               if retry < r.maxRetries-1 && snap != nil {
+                       continue
+               }
+
+               break
+       }
+
+       return r.rebuildWithSeq(seq)
+}
+
+func (r *rrSelector) rebuildWithSeq(seq int) getty.Session {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+
+       snap := r.getValidSnapshot()
+       if snap != nil {
+               n := len(snap.sessions)
+               if n > 0 {
+                       // try to find an open session starting from the 
calculated index
+                       for i := 0; i < n; i++ {
+                               idx := (seq + i) % n
+                               session := snap.sessions[idx]
+                               if !session.IsClosed() {
+                                       return session
+                               }
+                       }
+               }
+       }
+
+       addrToSession := make(map[string]getty.Session)
+       toDelete := make([]interface{}, 0)
+
+       r.sessions.Range(func(key, value interface{}) bool {
                session := key.(getty.Session)
                if session.IsClosed() {
-                       sessions.Delete(key)
+                       toDelete = append(toDelete, key)
                } else {
-                       adderToSession[session.RemoteAddr()] = session
-                       adders = append(adders, session.RemoteAddr())
+                       addr := session.RemoteAddr()
+                       addrToSession[addr] = session
                }
                return true
        })
-       sort.Strings(adders)
-       // adderToSession eq 0 means there are no available session
-       if len(adderToSession) == 0 {
+
+       // delete closed sessions synchronously
+       for _, k := range toDelete {
+               r.sessions.Delete(k)
+       }
+
+       if len(addrToSession) == 0 {
+               r.snapshot.Store((*rrSnapshot)(nil))
                return nil
        }
-       index := getPositiveSequence() % len(adderToSession)
-       return adderToSession[adders[index]]
+
+       // sort by address to ensure consistent order
+       addrs := make([]string, 0, len(addrToSession))
+       for addr := range addrToSession {
+               addrs = append(addrs, addr)
+       }
+       sort.Strings(addrs)
+
+       // build session list from sorted addresses
+       sessions := make([]getty.Session, len(addrs))
+       for i, addr := range addrs {
+               sessions[i] = addrToSession[addr]
+       }
+
+       // store new snapshot
+       newSnap := &rrSnapshot{sessions: sessions}
+       r.snapshot.Store(newSnap)
+
+       // select session using the same seq
+       n := len(sessions)
+       idx := seq % n
+       return sessions[idx]
 }
 
 func getPositiveSequence() int {
diff --git a/pkg/remoting/loadbalance/round_robin_loadbalance_test.go 
b/pkg/remoting/loadbalance/round_robin_loadbalance_test.go
index b54f19f8..9281077d 100644
--- a/pkg/remoting/loadbalance/round_robin_loadbalance_test.go
+++ b/pkg/remoting/loadbalance/round_robin_loadbalance_test.go
@@ -23,6 +23,7 @@ import (
        "sync"
        "testing"
 
+       getty "github.com/apache/dubbo-getty"
        "github.com/golang/mock/gomock"
        "github.com/stretchr/testify/assert"
 
@@ -98,3 +99,102 @@ func TestRoundRobinLoadBalance_Empty(t *testing.T) {
                t.Errorf("Expected nil, actual got %+v", result)
        }
 }
+
+func TestRoundRobinLoadBalance_ConcurrentAccess(t *testing.T) {
+       ctrl := gomock.NewController(t)
+       sessions := &sync.Map{}
+
+       for i := 0; i < 5; i++ {
+               session := mock.NewMockTestSession(ctrl)
+               session.EXPECT().IsClosed().Return(false).AnyTimes()
+               session.EXPECT().RemoteAddr().Return(fmt.Sprintf("addr-%d", 
i)).AnyTimes()
+               sessions.Store(session, fmt.Sprintf("session-%d", i))
+       }
+
+       var wg sync.WaitGroup
+       results := make([]getty.Session, 100)
+       for i := 0; i < 100; i++ {
+               wg.Add(1)
+               go func(idx int) {
+                       defer wg.Done()
+                       results[idx] = RoundRobinLoadBalance(sessions, "xid")
+               }(i)
+       }
+       wg.Wait()
+
+       for i, result := range results {
+               assert.NotNil(t, result, "Result at index %d should not be 
nil", i)
+               assert.False(t, result.IsClosed())
+       }
+}
+
+func TestRoundRobinLoadBalance_SelectedSessionClosed(t *testing.T) {
+       ctrl := gomock.NewController(t)
+       sessions := &sync.Map{}
+
+       callCount := 0
+       session1 := mock.NewMockTestSession(ctrl)
+       session1.EXPECT().IsClosed().DoAndReturn(func() bool {
+               callCount++
+               return callCount > 2
+       }).AnyTimes()
+       session1.EXPECT().RemoteAddr().Return("addr-0").AnyTimes()
+
+       session2 := mock.NewMockTestSession(ctrl)
+       session2.EXPECT().IsClosed().Return(false).AnyTimes()
+       session2.EXPECT().RemoteAddr().Return("addr-1").AnyTimes()
+
+       sessions.Store(session1, "session-1")
+       sessions.Store(session2, "session-2")
+
+       result1 := RoundRobinLoadBalance(sessions, "xid")
+       assert.NotNil(t, result1)
+
+       result2 := RoundRobinLoadBalance(sessions, "xid")
+       assert.NotNil(t, result2)
+
+       result3 := RoundRobinLoadBalance(sessions, "xid")
+       assert.NotNil(t, result3)
+}
+
+func TestRRSelector_GetValidSnapshot_Nil(t *testing.T) {
+       selector := &rrSelector{sessions: &sync.Map{}}
+       selector.snapshot.Store((*rrSnapshot)(nil))
+
+       snap := selector.getValidSnapshot()
+       assert.Nil(t, snap, "Should return nil for nil snapshot")
+}
+
+func TestRRSelector_GetValidSnapshot_EmptySessions(t *testing.T) {
+       selector := &rrSelector{sessions: &sync.Map{}}
+       selector.snapshot.Store(&rrSnapshot{sessions: []getty.Session{}})
+
+       snap := selector.getValidSnapshot()
+       assert.Nil(t, snap, "Should return nil for empty sessions")
+}
+
+func TestRRSelector_RebuildWithSeq_DeleteClosedSessions(t *testing.T) {
+       ctrl := gomock.NewController(t)
+       sessions := &sync.Map{}
+
+       closedSession := mock.NewMockTestSession(ctrl)
+       closedSession.EXPECT().IsClosed().Return(true).AnyTimes()
+
+       openSession := mock.NewMockTestSession(ctrl)
+       openSession.EXPECT().IsClosed().Return(false).AnyTimes()
+       openSession.EXPECT().RemoteAddr().Return("addr-1").AnyTimes()
+
+       sessions.Store(closedSession, "closed")
+       sessions.Store(openSession, "open")
+
+       result := RoundRobinLoadBalance(sessions, "xid")
+
+       count := 0
+       sessions.Range(func(key, value interface{}) bool {
+               count++
+               return true
+       })
+       assert.Equal(t, 1, count, "Should only have 1 session left")
+       assert.NotNil(t, result)
+       assert.Equal(t, "addr-1", result.RemoteAddr())
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to