This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 560829cd feat: add round robin in remote module (#622)
560829cd is described below

commit 560829cd6bf0aa13f02b698c1551f2cd4b6247fd
Author: Jingliu Xiong <jingliu.xi...@vesoft.com>
AuthorDate: Thu Dec 28 15:38:26 2023 +0800

    feat: add round robin in remote module (#622)
---
 pkg/remoting/loadbalance/loadbalance.go            |   2 +
 .../loadbalance/round_robin_loadbalance.go         |  69 ++++++++++++++
 .../loadbalance/round_robin_loadbalance_test.go    | 100 +++++++++++++++++++++
 3 files changed, 171 insertions(+)

diff --git a/pkg/remoting/loadbalance/loadbalance.go 
b/pkg/remoting/loadbalance/loadbalance.go
index c5ddb679..8451e170 100644
--- a/pkg/remoting/loadbalance/loadbalance.go
+++ b/pkg/remoting/loadbalance/loadbalance.go
@@ -37,6 +37,8 @@ func Select(loadBalanceType string, sessions *sync.Map, xid 
string) getty.Sessio
                return RandomLoadBalance(sessions, xid)
        case xidLoadBalance:
                return XidLoadBalance(sessions, xid)
+       case roundRobinLoadBalance:
+               return RoundRobinLoadBalance(sessions, xid)
        default:
                return RandomLoadBalance(sessions, xid)
        }
diff --git a/pkg/remoting/loadbalance/round_robin_loadbalance.go 
b/pkg/remoting/loadbalance/round_robin_loadbalance.go
new file mode 100644
index 00000000..9cebc926
--- /dev/null
+++ b/pkg/remoting/loadbalance/round_robin_loadbalance.go
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package loadbalance
+
+import (
+       "math"
+       "sort"
+       "sync"
+       "sync/atomic"
+
+       getty "github.com/apache/dubbo-getty"
+)
+
+var sequence int32
+
+func RoundRobinLoadBalance(sessions *sync.Map, s string) getty.Session {
+       // collect sync.Map adderToSession
+       // filter out closed session instance
+       adderToSession := make(map[string]getty.Session, 0)
+       // map has no sequence, we should sort it to make sure the sequence is 
always the same
+       adders := make([]string, 0)
+       sessions.Range(func(key, value interface{}) bool {
+               session := key.(getty.Session)
+               if session.IsClosed() {
+                       sessions.Delete(key)
+               } else {
+                       adderToSession[session.RemoteAddr()] = session
+                       adders = append(adders, session.RemoteAddr())
+               }
+               return true
+       })
+       sort.Strings(adders)
+       // adderToSession eq 0 means there are no available session
+       if len(adderToSession) == 0 {
+               return nil
+       }
+       index := getPositiveSequence() % len(adderToSession)
+       return adderToSession[adders[index]]
+}
+
+func getPositiveSequence() int {
+       for {
+               current := atomic.LoadInt32(&sequence)
+               var next int32
+               if current == math.MaxInt32 {
+                       next = 0
+               } else {
+                       next = current + 1
+               }
+               if atomic.CompareAndSwapInt32(&sequence, current, next) {
+                       return int(current)
+               }
+       }
+}
diff --git a/pkg/remoting/loadbalance/round_robin_loadbalance_test.go 
b/pkg/remoting/loadbalance/round_robin_loadbalance_test.go
new file mode 100644
index 00000000..c5826570
--- /dev/null
+++ b/pkg/remoting/loadbalance/round_robin_loadbalance_test.go
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package loadbalance
+
+import (
+       "fmt"
+       "math"
+       "sync"
+       "testing"
+
+       "github.com/golang/mock/gomock"
+       "github.com/stretchr/testify/assert"
+
+       "github.com/seata/seata-go/pkg/remoting/mock"
+)
+
+func TestRoundRobinLoadBalance_Normal(t *testing.T) {
+       ctrl := gomock.NewController(t)
+       sessions := &sync.Map{}
+
+       for i := 0; i < 10; i++ {
+               session := mock.NewMockTestSession(ctrl)
+               session.EXPECT().IsClosed().Return(i == 2).AnyTimes()
+               session.EXPECT().RemoteAddr().Return(fmt.Sprintf("%d", 
i)).AnyTimes()
+               sessions.Store(session, fmt.Sprintf("session-%d", i+1))
+       }
+
+       for i := 0; i < 10; i++ {
+               if i == 2 {
+                       continue
+               }
+               result := RoundRobinLoadBalance(sessions, "some_xid")
+               assert.Equal(t, fmt.Sprintf("%d", i), result.RemoteAddr())
+               assert.NotNil(t, result)
+               assert.False(t, result.IsClosed())
+       }
+}
+
+func TestRoundRobinLoadBalance_OverSequence(t *testing.T) {
+       ctrl := gomock.NewController(t)
+       sessions := &sync.Map{}
+       sequence = math.MaxInt32
+
+       for i := 0; i < 10; i++ {
+               session := mock.NewMockTestSession(ctrl)
+               session.EXPECT().IsClosed().Return(false).AnyTimes()
+               session.EXPECT().RemoteAddr().Return(fmt.Sprintf("%d", 
i)).AnyTimes()
+               sessions.Store(session, fmt.Sprintf("session-%d", i+1))
+       }
+
+       for i := 0; i < 10; i++ {
+               // over sequence here
+               if i == 0 {
+                       result := RoundRobinLoadBalance(sessions, "some_xid")
+                       assert.Equal(t, "7", result.RemoteAddr())
+                       assert.NotNil(t, result)
+                       assert.False(t, result.IsClosed())
+                       continue
+               }
+               result := RoundRobinLoadBalance(sessions, "some_xid")
+               assert.Equal(t, fmt.Sprintf("%d", i-1), result.RemoteAddr())
+               assert.NotNil(t, result)
+               assert.False(t, result.IsClosed())
+       }
+}
+
+func TestRoundRobinLoadBalance_All_Closed(t *testing.T) {
+       ctrl := gomock.NewController(t)
+       sessions := &sync.Map{}
+       for i := 0; i < 10; i++ {
+               session := mock.NewMockTestSession(ctrl)
+               session.EXPECT().IsClosed().Return(true).AnyTimes()
+               sessions.Store(session, fmt.Sprintf("session-%d", i+1))
+       }
+       if result := RoundRobinLoadBalance(sessions, "some_xid"); result != nil 
{
+               t.Errorf("Expected nil, actual got %+v", result)
+       }
+}
+
+func TestRoundRobinLoadBalance_Empty(t *testing.T) {
+       sessions := &sync.Map{}
+       if result := RoundRobinLoadBalance(sessions, "some_xid"); result != nil 
{
+               t.Errorf("Expected nil, actual got %+v", result)
+       }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org
For additional commands, e-mail: notifications-h...@seata.apache.org

Reply via email to