This is an automated email from the ASF dual-hosted git repository.

luky116 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a8752047 implement least active load balance (#602)
a8752047 is described below

commit a875204776bd7317650625781f69e522c529a4b0
Author: R.oldmee <[email protected]>
AuthorDate: Sat Feb 3 19:47:50 2024 +0800

    implement least active load balance (#602)
    
    feat: add least active load balance
---
 pkg/remoting/getty/getty_remoting.go               | 15 ++++-
 .../loadbalance/least_active_loadbalance.go        | 65 ++++++++++++++++++++++
 .../loadbalance/least_active_loadbalance_test.go   | 63 +++++++++++++++++++++
 pkg/remoting/loadbalance/loadbalance.go            |  2 +
 .../loadbalance.go => rpc/rpc_status.go}           | 61 +++++++++++++-------
 .../loadbalance.go => rpc/rpc_status_test.go}      | 47 +++++++++-------
 6 files changed, 210 insertions(+), 43 deletions(-)

diff --git a/pkg/remoting/getty/getty_remoting.go 
b/pkg/remoting/getty/getty_remoting.go
index d0474388..772f6720 100644
--- a/pkg/remoting/getty/getty_remoting.go
+++ b/pkg/remoting/getty/getty_remoting.go
@@ -25,6 +25,7 @@ import (
        getty "github.com/apache/dubbo-getty"
 
        "github.com/seata/seata-go/pkg/protocol/message"
+       "github.com/seata/seata-go/pkg/remoting/rpc"
        "github.com/seata/seata-go/pkg/util/log"
 )
 
@@ -61,14 +62,26 @@ func (g *GettyRemoting) SendSync(msg message.RpcMessage, s 
getty.Session, callba
        if s == nil {
                s = sessionManager.selectSession(msg)
        }
-       return g.sendAsync(s, msg, callback)
+       rpc.BeginCount(s.RemoteAddr())
+       result, err := g.sendAsync(s, msg, callback)
+       rpc.EndCount(s.RemoteAddr())
+       if err != nil {
+               log.Errorf("send message: %#v, session: %s", msg, s.Stat())
+               return nil, err
+       }
+       return result, err
 }
 
 func (g *GettyRemoting) SendASync(msg message.RpcMessage, s getty.Session, 
callback callbackMethod) error {
        if s == nil {
                s = sessionManager.selectSession(msg)
        }
+       rpc.BeginCount(s.RemoteAddr())
        _, err := g.sendAsync(s, msg, callback)
+       rpc.EndCount(s.RemoteAddr())
+       if err != nil {
+               log.Errorf("send message: %#v, session: %s", msg, s.Stat())
+       }
        return err
 }
 
diff --git a/pkg/remoting/loadbalance/least_active_loadbalance.go 
b/pkg/remoting/loadbalance/least_active_loadbalance.go
new file mode 100644
index 00000000..50b98f01
--- /dev/null
+++ b/pkg/remoting/loadbalance/least_active_loadbalance.go
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package loadbalance
+
+import (
+       "math/rand"
+       "sync"
+       "time"
+
+       "github.com/seata/seata-go/pkg/remoting/rpc"
+
+       getty "github.com/apache/dubbo-getty"
+)
+
+func LeastActiveLoadBalance(sessions *sync.Map, xid string) getty.Session {
+       var session getty.Session
+       var leastActive int32 = -1
+       leastCount := 0
+       var leastIndexes []getty.Session
+       sessions.Range(func(key, value interface{}) bool {
+               session = key.(getty.Session)
+               if session.IsClosed() {
+                       sessions.Delete(session)
+               } else {
+                       active := 
rpc.GetStatus(session.RemoteAddr()).GetActive()
+                       if leastActive == -1 || active < leastActive {
+                               leastActive = active
+                               leastCount = 1
+                               if len(leastIndexes) > 0 {
+                                       leastIndexes = leastIndexes[:0]
+                               }
+                               leastIndexes = append(leastIndexes, session)
+                       } else if active == leastActive {
+                               leastIndexes = append(leastIndexes, session)
+                               leastCount++
+                       }
+               }
+               return true
+       })
+
+       if leastCount == 0 {
+               return nil
+       }
+
+       if leastCount == 1 {
+               return leastIndexes[0]
+       } else {
+               return 
leastIndexes[rand.New(rand.NewSource(time.Now().UnixNano())).Intn(leastCount)]
+       }
+}
diff --git a/pkg/remoting/loadbalance/least_active_loadbalance_test.go 
b/pkg/remoting/loadbalance/least_active_loadbalance_test.go
new file mode 100644
index 00000000..a63deb21
--- /dev/null
+++ b/pkg/remoting/loadbalance/least_active_loadbalance_test.go
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package loadbalance
+
+import (
+       "fmt"
+       "strconv"
+       "sync"
+       "testing"
+
+       "github.com/golang/mock/gomock"
+       "github.com/seata/seata-go/pkg/remoting/mock"
+       "github.com/seata/seata-go/pkg/remoting/rpc"
+       "github.com/stretchr/testify/assert"
+)
+
+func TestLeastActiveLoadBalance(t *testing.T) {
+       ctrl := gomock.NewController(t)
+       sessions := &sync.Map{}
+
+       for i := 1; i <= 3; i++ {
+               session := mock.NewMockTestSession(ctrl)
+               session.EXPECT().IsClosed().Return(false).AnyTimes()
+               addr := "127.0.0." + strconv.Itoa(i) + ":8000"
+               session.EXPECT().RemoteAddr().AnyTimes().DoAndReturn(func() 
string {
+                       return addr
+               })
+               sessions.Store(session, fmt.Sprintf("session-%d", i))
+               rpc.BeginCount(addr)
+       }
+
+       session := mock.NewMockTestSession(ctrl)
+       session.EXPECT().IsClosed().Return(true).AnyTimes()
+       addr := "127.0.0.5:8000"
+       session.EXPECT().RemoteAddr().AnyTimes().DoAndReturn(func() string {
+               return addr
+       })
+       sessions.Store(session, "session-5")
+       rpc.BeginCount(addr)
+
+       countTwo := "127.0.0.1:8000"
+       rpc.BeginCount(countTwo)
+
+       result := LeastActiveLoadBalance(sessions, "test_xid")
+       assert.False(t, result.RemoteAddr() == countTwo)
+       assert.False(t, result.RemoteAddr() == addr)
+       assert.False(t, result.IsClosed())
+}
diff --git a/pkg/remoting/loadbalance/loadbalance.go 
b/pkg/remoting/loadbalance/loadbalance.go
index 8451e170..f867793b 100644
--- a/pkg/remoting/loadbalance/loadbalance.go
+++ b/pkg/remoting/loadbalance/loadbalance.go
@@ -37,6 +37,8 @@ func Select(loadBalanceType string, sessions *sync.Map, xid 
string) getty.Sessio
                return RandomLoadBalance(sessions, xid)
        case xidLoadBalance:
                return XidLoadBalance(sessions, xid)
+       case leastActiveLoadBalance:
+               return LeastActiveLoadBalance(sessions, xid)    
        case roundRobinLoadBalance:
                return RoundRobinLoadBalance(sessions, xid)
        default:
diff --git a/pkg/remoting/loadbalance/loadbalance.go 
b/pkg/remoting/rpc/rpc_status.go
similarity index 50%
copy from pkg/remoting/loadbalance/loadbalance.go
copy to pkg/remoting/rpc/rpc_status.go
index 8451e170..e595237a 100644
--- a/pkg/remoting/loadbalance/loadbalance.go
+++ b/pkg/remoting/rpc/rpc_status.go
@@ -15,31 +15,50 @@
  * limitations under the License.
  */
 
-package loadbalance
+package rpc
 
 import (
        "sync"
-
-       getty "github.com/apache/dubbo-getty"
+       "sync/atomic"
 )
 
-const (
-       randomLoadBalance         = "RandomLoadBalance"
-       xidLoadBalance            = "XID"
-       roundRobinLoadBalance     = "RoundRobinLoadBalance"
-       consistentHashLoadBalance = "ConsistentHashLoadBalance"
-       leastActiveLoadBalance    = "LeastActiveLoadBalance"
-)
+var serviceStatusMap sync.Map
+
+type Status struct {
+       Active int32
+       Total  int32
+}
+
+// RemoveStatus remove the RpcStatus of this service
+func RemoveStatus(service string) {
+       serviceStatusMap.Delete(service)
+}
+
+// BeginCount begin count
+func BeginCount(service string) {
+       status := GetStatus(service)
+       atomic.AddInt32(&status.Active, 1)
+}
+
+// EndCount end count
+func EndCount(service string) {
+       status := GetStatus(service)
+       atomic.AddInt32(&status.Active, -1)
+       atomic.AddInt32(&status.Total, 1)
+}
+
+// GetStatus get status
+func GetStatus(service string) *Status {
+       a, _ := serviceStatusMap.LoadOrStore(service, new(Status))
+       return a.(*Status)
+}
+
+// GetActive get active.
+func (s *Status) GetActive() int32 {
+       return s.Active
+}
 
-func Select(loadBalanceType string, sessions *sync.Map, xid string) 
getty.Session {
-       switch loadBalanceType {
-       case randomLoadBalance:
-               return RandomLoadBalance(sessions, xid)
-       case xidLoadBalance:
-               return XidLoadBalance(sessions, xid)
-       case roundRobinLoadBalance:
-               return RoundRobinLoadBalance(sessions, xid)
-       default:
-               return RandomLoadBalance(sessions, xid)
-       }
+// GetTotal get total.
+func (s *Status) GetTotal() int32 {
+       return s.Total
 }
diff --git a/pkg/remoting/loadbalance/loadbalance.go 
b/pkg/remoting/rpc/rpc_status_test.go
similarity index 54%
copy from pkg/remoting/loadbalance/loadbalance.go
copy to pkg/remoting/rpc/rpc_status_test.go
index 8451e170..2e92d267 100644
--- a/pkg/remoting/loadbalance/loadbalance.go
+++ b/pkg/remoting/rpc/rpc_status_test.go
@@ -15,31 +15,36 @@
  * limitations under the License.
  */
 
-package loadbalance
+package rpc
 
 import (
-       "sync"
+       "testing"
 
-       getty "github.com/apache/dubbo-getty"
+       "github.com/stretchr/testify/assert"
 )
 
-const (
-       randomLoadBalance         = "RandomLoadBalance"
-       xidLoadBalance            = "XID"
-       roundRobinLoadBalance     = "RoundRobinLoadBalance"
-       consistentHashLoadBalance = "ConsistentHashLoadBalance"
-       leastActiveLoadBalance    = "LeastActiveLoadBalance"
-)
+var service = "127.0.0.1:8000"
+
+func TestStatus(t *testing.T) {
+       rpcStatus1 := GetStatus(service)
+       assert.NotNil(t, rpcStatus1)
+       rpcStatus2 := GetStatus(service)
+       assert.Equal(t, rpcStatus1, rpcStatus2)
+}
+
+func TestRemoveStatus(t *testing.T) {
+       old := GetStatus(service)
+       RemoveStatus(service)
+       assert.Equal(t, GetStatus(service), old)
+}
+
+func TestBeginCount(t *testing.T) {
+       BeginCount(service)
+       assert.Equal(t, GetStatus(service).GetActive(), int32(1))
+}
 
-func Select(loadBalanceType string, sessions *sync.Map, xid string) 
getty.Session {
-       switch loadBalanceType {
-       case randomLoadBalance:
-               return RandomLoadBalance(sessions, xid)
-       case xidLoadBalance:
-               return XidLoadBalance(sessions, xid)
-       case roundRobinLoadBalance:
-               return RoundRobinLoadBalance(sessions, xid)
-       default:
-               return RandomLoadBalance(sessions, xid)
-       }
+func TestEndCount(t *testing.T) {
+       EndCount(service)
+       assert.Equal(t, GetStatus(service).GetActive(), int32(0))
+       assert.Equal(t, GetStatus(service).GetTotal(), int32(1))
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to