This is an automated email from the ASF dual-hosted git repository.
luky116 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git
The following commit(s) were added to refs/heads/master by this push:
new a8752047 implement least active load balance (#602)
a8752047 is described below
commit a875204776bd7317650625781f69e522c529a4b0
Author: R.oldmee <[email protected]>
AuthorDate: Sat Feb 3 19:47:50 2024 +0800
implement least active load balance (#602)
feat: add least active load balance
---
pkg/remoting/getty/getty_remoting.go | 15 ++++-
.../loadbalance/least_active_loadbalance.go | 65 ++++++++++++++++++++++
.../loadbalance/least_active_loadbalance_test.go | 63 +++++++++++++++++++++
pkg/remoting/loadbalance/loadbalance.go | 2 +
.../loadbalance.go => rpc/rpc_status.go} | 61 +++++++++++++-------
.../loadbalance.go => rpc/rpc_status_test.go} | 47 +++++++++-------
6 files changed, 210 insertions(+), 43 deletions(-)
diff --git a/pkg/remoting/getty/getty_remoting.go
b/pkg/remoting/getty/getty_remoting.go
index d0474388..772f6720 100644
--- a/pkg/remoting/getty/getty_remoting.go
+++ b/pkg/remoting/getty/getty_remoting.go
@@ -25,6 +25,7 @@ import (
getty "github.com/apache/dubbo-getty"
"github.com/seata/seata-go/pkg/protocol/message"
+ "github.com/seata/seata-go/pkg/remoting/rpc"
"github.com/seata/seata-go/pkg/util/log"
)
@@ -61,14 +62,26 @@ func (g *GettyRemoting) SendSync(msg message.RpcMessage, s
getty.Session, callba
if s == nil {
s = sessionManager.selectSession(msg)
}
- return g.sendAsync(s, msg, callback)
+ rpc.BeginCount(s.RemoteAddr())
+ result, err := g.sendAsync(s, msg, callback)
+ rpc.EndCount(s.RemoteAddr())
+ if err != nil {
+ log.Errorf("send message: %#v, session: %s", msg, s.Stat())
+ return nil, err
+ }
+ return result, err
}
func (g *GettyRemoting) SendASync(msg message.RpcMessage, s getty.Session,
callback callbackMethod) error {
if s == nil {
s = sessionManager.selectSession(msg)
}
+ rpc.BeginCount(s.RemoteAddr())
_, err := g.sendAsync(s, msg, callback)
+ rpc.EndCount(s.RemoteAddr())
+ if err != nil {
+ log.Errorf("send message: %#v, session: %s", msg, s.Stat())
+ }
return err
}
diff --git a/pkg/remoting/loadbalance/least_active_loadbalance.go
b/pkg/remoting/loadbalance/least_active_loadbalance.go
new file mode 100644
index 00000000..50b98f01
--- /dev/null
+++ b/pkg/remoting/loadbalance/least_active_loadbalance.go
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package loadbalance
+
+import (
+ "math/rand"
+ "sync"
+ "time"
+
+ "github.com/seata/seata-go/pkg/remoting/rpc"
+
+ getty "github.com/apache/dubbo-getty"
+)
+
+func LeastActiveLoadBalance(sessions *sync.Map, xid string) getty.Session {
+ var session getty.Session
+ var leastActive int32 = -1
+ leastCount := 0
+ var leastIndexes []getty.Session
+ sessions.Range(func(key, value interface{}) bool {
+ session = key.(getty.Session)
+ if session.IsClosed() {
+ sessions.Delete(session)
+ } else {
+ active :=
rpc.GetStatus(session.RemoteAddr()).GetActive()
+ if leastActive == -1 || active < leastActive {
+ leastActive = active
+ leastCount = 1
+ if len(leastIndexes) > 0 {
+ leastIndexes = leastIndexes[:0]
+ }
+ leastIndexes = append(leastIndexes, session)
+ } else if active == leastActive {
+ leastIndexes = append(leastIndexes, session)
+ leastCount++
+ }
+ }
+ return true
+ })
+
+ if leastCount == 0 {
+ return nil
+ }
+
+ if leastCount == 1 {
+ return leastIndexes[0]
+ } else {
+ return
leastIndexes[rand.New(rand.NewSource(time.Now().UnixNano())).Intn(leastCount)]
+ }
+}
diff --git a/pkg/remoting/loadbalance/least_active_loadbalance_test.go
b/pkg/remoting/loadbalance/least_active_loadbalance_test.go
new file mode 100644
index 00000000..a63deb21
--- /dev/null
+++ b/pkg/remoting/loadbalance/least_active_loadbalance_test.go
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package loadbalance
+
+import (
+ "fmt"
+ "strconv"
+ "sync"
+ "testing"
+
+ "github.com/golang/mock/gomock"
+ "github.com/seata/seata-go/pkg/remoting/mock"
+ "github.com/seata/seata-go/pkg/remoting/rpc"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestLeastActiveLoadBalance(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ sessions := &sync.Map{}
+
+ for i := 1; i <= 3; i++ {
+ session := mock.NewMockTestSession(ctrl)
+ session.EXPECT().IsClosed().Return(false).AnyTimes()
+ addr := "127.0.0." + strconv.Itoa(i) + ":8000"
+ session.EXPECT().RemoteAddr().AnyTimes().DoAndReturn(func()
string {
+ return addr
+ })
+ sessions.Store(session, fmt.Sprintf("session-%d", i))
+ rpc.BeginCount(addr)
+ }
+
+ session := mock.NewMockTestSession(ctrl)
+ session.EXPECT().IsClosed().Return(true).AnyTimes()
+ addr := "127.0.0.5:8000"
+ session.EXPECT().RemoteAddr().AnyTimes().DoAndReturn(func() string {
+ return addr
+ })
+ sessions.Store(session, "session-5")
+ rpc.BeginCount(addr)
+
+ countTwo := "127.0.0.1:8000"
+ rpc.BeginCount(countTwo)
+
+ result := LeastActiveLoadBalance(sessions, "test_xid")
+ assert.False(t, result.RemoteAddr() == countTwo)
+ assert.False(t, result.RemoteAddr() == addr)
+ assert.False(t, result.IsClosed())
+}
diff --git a/pkg/remoting/loadbalance/loadbalance.go
b/pkg/remoting/loadbalance/loadbalance.go
index 8451e170..f867793b 100644
--- a/pkg/remoting/loadbalance/loadbalance.go
+++ b/pkg/remoting/loadbalance/loadbalance.go
@@ -37,6 +37,8 @@ func Select(loadBalanceType string, sessions *sync.Map, xid
string) getty.Sessio
return RandomLoadBalance(sessions, xid)
case xidLoadBalance:
return XidLoadBalance(sessions, xid)
+ case leastActiveLoadBalance:
+ return LeastActiveLoadBalance(sessions, xid)
case roundRobinLoadBalance:
return RoundRobinLoadBalance(sessions, xid)
default:
diff --git a/pkg/remoting/loadbalance/loadbalance.go
b/pkg/remoting/rpc/rpc_status.go
similarity index 50%
copy from pkg/remoting/loadbalance/loadbalance.go
copy to pkg/remoting/rpc/rpc_status.go
index 8451e170..e595237a 100644
--- a/pkg/remoting/loadbalance/loadbalance.go
+++ b/pkg/remoting/rpc/rpc_status.go
@@ -15,31 +15,50 @@
* limitations under the License.
*/
-package loadbalance
+package rpc
import (
"sync"
-
- getty "github.com/apache/dubbo-getty"
+ "sync/atomic"
)
-const (
- randomLoadBalance = "RandomLoadBalance"
- xidLoadBalance = "XID"
- roundRobinLoadBalance = "RoundRobinLoadBalance"
- consistentHashLoadBalance = "ConsistentHashLoadBalance"
- leastActiveLoadBalance = "LeastActiveLoadBalance"
-)
+var serviceStatusMap sync.Map
+
+type Status struct {
+ Active int32
+ Total int32
+}
+
+// RemoveStatus remove the RpcStatus of this service
+func RemoveStatus(service string) {
+ serviceStatusMap.Delete(service)
+}
+
+// BeginCount begin count
+func BeginCount(service string) {
+ status := GetStatus(service)
+ atomic.AddInt32(&status.Active, 1)
+}
+
+// EndCount end count
+func EndCount(service string) {
+ status := GetStatus(service)
+ atomic.AddInt32(&status.Active, -1)
+ atomic.AddInt32(&status.Total, 1)
+}
+
+// GetStatus get status
+func GetStatus(service string) *Status {
+ a, _ := serviceStatusMap.LoadOrStore(service, new(Status))
+ return a.(*Status)
+}
+
+// GetActive get active.
+func (s *Status) GetActive() int32 {
+ return s.Active
+}
-func Select(loadBalanceType string, sessions *sync.Map, xid string)
getty.Session {
- switch loadBalanceType {
- case randomLoadBalance:
- return RandomLoadBalance(sessions, xid)
- case xidLoadBalance:
- return XidLoadBalance(sessions, xid)
- case roundRobinLoadBalance:
- return RoundRobinLoadBalance(sessions, xid)
- default:
- return RandomLoadBalance(sessions, xid)
- }
+// GetTotal get total.
+func (s *Status) GetTotal() int32 {
+ return s.Total
}
diff --git a/pkg/remoting/loadbalance/loadbalance.go
b/pkg/remoting/rpc/rpc_status_test.go
similarity index 54%
copy from pkg/remoting/loadbalance/loadbalance.go
copy to pkg/remoting/rpc/rpc_status_test.go
index 8451e170..2e92d267 100644
--- a/pkg/remoting/loadbalance/loadbalance.go
+++ b/pkg/remoting/rpc/rpc_status_test.go
@@ -15,31 +15,36 @@
* limitations under the License.
*/
-package loadbalance
+package rpc
import (
- "sync"
+ "testing"
- getty "github.com/apache/dubbo-getty"
+ "github.com/stretchr/testify/assert"
)
-const (
- randomLoadBalance = "RandomLoadBalance"
- xidLoadBalance = "XID"
- roundRobinLoadBalance = "RoundRobinLoadBalance"
- consistentHashLoadBalance = "ConsistentHashLoadBalance"
- leastActiveLoadBalance = "LeastActiveLoadBalance"
-)
+var service = "127.0.0.1:8000"
+
+func TestStatus(t *testing.T) {
+ rpcStatus1 := GetStatus(service)
+ assert.NotNil(t, rpcStatus1)
+ rpcStatus2 := GetStatus(service)
+ assert.Equal(t, rpcStatus1, rpcStatus2)
+}
+
+func TestRemoveStatus(t *testing.T) {
+ old := GetStatus(service)
+ RemoveStatus(service)
+ assert.Equal(t, GetStatus(service), old)
+}
+
+func TestBeginCount(t *testing.T) {
+ BeginCount(service)
+ assert.Equal(t, GetStatus(service).GetActive(), int32(1))
+}
-func Select(loadBalanceType string, sessions *sync.Map, xid string)
getty.Session {
- switch loadBalanceType {
- case randomLoadBalance:
- return RandomLoadBalance(sessions, xid)
- case xidLoadBalance:
- return XidLoadBalance(sessions, xid)
- case roundRobinLoadBalance:
- return RoundRobinLoadBalance(sessions, xid)
- default:
- return RandomLoadBalance(sessions, xid)
- }
+func TestEndCount(t *testing.T) {
+ EndCount(service)
+ assert.Equal(t, GetStatus(service).GetActive(), int32(0))
+ assert.Equal(t, GetStatus(service).GetTotal(), int32(1))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]