This is an automated email from the ASF dual-hosted git repository.

luky116 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-seata-go.git


The following commit(s) were added to refs/heads/master by this push:
     new e18861d2 feat: add consist load balance (#600)
e18861d2 is described below

commit e18861d25adf3053b04bd7a4f6d82ff99fcc4950
Author: wang1309 <[email protected]>
AuthorDate: Sat Feb 17 18:47:43 2024 +0800

    feat: add consist load balance (#600)
    
    add consist load balance
---
 pkg/client/config.go                               |   1 +
 pkg/remoting/getty/rpc_client.go                   |   1 +
 .../loadbalance/consistent_hash_loadbalance.go     | 162 +++++++++++++++++++++
 ...ance.go => consistent_hash_loadbalance_test.go} |  45 +++---
 pkg/remoting/loadbalance/loadbalance.go            |   2 +
 .../loadbalance/random_loadbalance_test.go         |   3 +-
 pkg/remoting/loadbalance/xid_loadbalance_test.go   |   3 +-
 7 files changed, 195 insertions(+), 22 deletions(-)

diff --git a/pkg/client/config.go b/pkg/client/config.go
index bd84852d..4c56be27 100644
--- a/pkg/client/config.go
+++ b/pkg/client/config.go
@@ -31,6 +31,7 @@ import (
        "github.com/knadh/koanf/parsers/toml"
        "github.com/knadh/koanf/parsers/yaml"
        "github.com/knadh/koanf/providers/rawbytes"
+
        "github.com/seata/seata-go/pkg/discovery"
 
        "github.com/seata/seata-go/pkg/datasource/sql"
diff --git a/pkg/remoting/getty/rpc_client.go b/pkg/remoting/getty/rpc_client.go
index 601064ef..7311876b 100644
--- a/pkg/remoting/getty/rpc_client.go
+++ b/pkg/remoting/getty/rpc_client.go
@@ -25,6 +25,7 @@ import (
 
        getty "github.com/apache/dubbo-getty"
        gxsync "github.com/dubbogo/gost/sync"
+
        "github.com/seata/seata-go/pkg/discovery"
        "github.com/seata/seata-go/pkg/protocol/codec"
        "github.com/seata/seata-go/pkg/remoting/config"
diff --git a/pkg/remoting/loadbalance/consistent_hash_loadbalance.go 
b/pkg/remoting/loadbalance/consistent_hash_loadbalance.go
new file mode 100644
index 00000000..626a0104
--- /dev/null
+++ b/pkg/remoting/loadbalance/consistent_hash_loadbalance.go
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package loadbalance
+
+import (
+       "crypto/md5"
+       "fmt"
+       "sort"
+       "sync"
+
+       getty "github.com/apache/dubbo-getty"
+)
+
+var (
+       once                     sync.Once
+       defaultVirtualNodeNumber = 10
+       consistentInstance       *Consistent
+)
+
+type Consistent struct {
+       sync.RWMutex
+       virtualNodeCount int
+       // consistent hashCircle
+       hashCircle      map[int64]getty.Session
+       sortedHashNodes []int64
+}
+
+func (c *Consistent) put(key int64, session getty.Session) {
+       c.Lock()
+       defer c.Unlock()
+       c.hashCircle[key] = session
+}
+
+func (c *Consistent) hash(key string) int64 {
+       hashByte := md5.Sum([]byte(key))
+       var res int64
+       for i := 0; i < 4; i++ {
+               res <<= 8
+               res |= int64(hashByte[i]) & 0xff
+       }
+
+       return res
+}
+
+// pick get a  node
+func (c *Consistent) pick(sessions *sync.Map, key string) getty.Session {
+       hashKey := c.hash(key)
+       index := sort.Search(len(c.sortedHashNodes), func(i int) bool {
+               return c.sortedHashNodes[i] >= hashKey
+       })
+
+       if index == len(c.sortedHashNodes) {
+               return RandomLoadBalance(sessions, key)
+       }
+
+       c.RLock()
+       session, ok := c.hashCircle[c.sortedHashNodes[index]]
+       if !ok {
+               c.RUnlock()
+               return RandomLoadBalance(sessions, key)
+       }
+       c.RUnlock()
+
+       if session.IsClosed() {
+               go c.refreshHashCircle(sessions)
+               return c.firstKey()
+       }
+
+       return session
+}
+
+// refreshHashCircle refresh hashCircle
+func (c *Consistent) refreshHashCircle(sessions *sync.Map) {
+       var sortedHashNodes []int64
+       hashCircle := make(map[int64]getty.Session)
+       var session getty.Session
+       sessions.Range(func(key, value interface{}) bool {
+               session = key.(getty.Session)
+               for i := 0; i < defaultVirtualNodeNumber; i++ {
+                       if !session.IsClosed() {
+                               position := c.hash(fmt.Sprintf("%s%d", 
session.RemoteAddr(), i))
+                               hashCircle[position] = session
+                               sortedHashNodes = append(sortedHashNodes, 
position)
+                       } else {
+                               sessions.Delete(key)
+                       }
+               }
+               return true
+       })
+
+       // virtual node sort
+       sort.Slice(sortedHashNodes, func(i, j int) bool {
+               return sortedHashNodes[i] < sortedHashNodes[j]
+       })
+
+       c.sortedHashNodes = sortedHashNodes
+       c.hashCircle = hashCircle
+}
+
+func (c *Consistent) firstKey() getty.Session {
+       c.RLock()
+       defer c.RUnlock()
+
+       if len(c.sortedHashNodes) > 0 {
+               return c.hashCircle[c.sortedHashNodes[0]]
+       }
+
+       return nil
+}
+
+func newConsistenceInstance(sessions *sync.Map) *Consistent {
+       once.Do(func() {
+               consistentInstance = &Consistent{
+                       hashCircle: make(map[int64]getty.Session),
+               }
+               // construct hash circle
+               sessions.Range(func(key, value interface{}) bool {
+                       session := key.(getty.Session)
+                       for i := 0; i < defaultVirtualNodeNumber; i++ {
+                               if !session.IsClosed() {
+                                       position := 
consistentInstance.hash(fmt.Sprintf("%s%d", session.RemoteAddr(), i))
+                                       consistentInstance.put(position, 
session)
+                                       consistentInstance.sortedHashNodes = 
append(consistentInstance.sortedHashNodes, position)
+                               } else {
+                                       sessions.Delete(key)
+                               }
+                       }
+                       return true
+               })
+
+               // virtual node sort
+               sort.Slice(consistentInstance.sortedHashNodes, func(i, j int) 
bool {
+                       return consistentInstance.sortedHashNodes[i] < 
consistentInstance.sortedHashNodes[j]
+               })
+       })
+
+       return consistentInstance
+}
+
+func ConsistentHashLoadBalance(sessions *sync.Map, xid string) getty.Session {
+       if consistentInstance == nil {
+               newConsistenceInstance(sessions)
+       }
+
+       // pick a node
+       return consistentInstance.pick(sessions, xid)
+}
diff --git a/pkg/remoting/loadbalance/loadbalance.go 
b/pkg/remoting/loadbalance/consistent_hash_loadbalance_test.go
similarity index 52%
copy from pkg/remoting/loadbalance/loadbalance.go
copy to pkg/remoting/loadbalance/consistent_hash_loadbalance_test.go
index f867793b..3fc5b509 100644
--- a/pkg/remoting/loadbalance/loadbalance.go
+++ b/pkg/remoting/loadbalance/consistent_hash_loadbalance_test.go
@@ -18,30 +18,35 @@
 package loadbalance
 
 import (
+       "fmt"
        "sync"
+       "testing"
 
-       getty "github.com/apache/dubbo-getty"
-)
+       "github.com/golang/mock/gomock"
+       "github.com/stretchr/testify/assert"
 
-const (
-       randomLoadBalance         = "RandomLoadBalance"
-       xidLoadBalance            = "XID"
-       roundRobinLoadBalance     = "RoundRobinLoadBalance"
-       consistentHashLoadBalance = "ConsistentHashLoadBalance"
-       leastActiveLoadBalance    = "LeastActiveLoadBalance"
+       "github.com/seata/seata-go/pkg/remoting/mock"
 )
 
-func Select(loadBalanceType string, sessions *sync.Map, xid string) 
getty.Session {
-       switch loadBalanceType {
-       case randomLoadBalance:
-               return RandomLoadBalance(sessions, xid)
-       case xidLoadBalance:
-               return XidLoadBalance(sessions, xid)
-       case leastActiveLoadBalance:
-               return LeastActiveLoadBalance(sessions, xid)    
-       case roundRobinLoadBalance:
-               return RoundRobinLoadBalance(sessions, xid)
-       default:
-               return RandomLoadBalance(sessions, xid)
+func TestConsistentHashLoadBalance(t *testing.T) {
+       ctrl := gomock.NewController(t)
+       sessions := &sync.Map{}
+
+       for i := 0; i < 3; i++ {
+               session := mock.NewMockTestSession(ctrl)
+               session.EXPECT().IsClosed().Return(false).AnyTimes()
+               session.EXPECT().RemoteAddr().AnyTimes().DoAndReturn(func() 
string {
+                       return "127.0.0.1:8000"
+               })
+               sessions.Store(session, fmt.Sprintf("session-%d", i))
        }
+
+       result := ConsistentHashLoadBalance(sessions, "test_xid")
+       assert.NotNil(t, result)
+       assert.False(t, result.IsClosed())
+
+       sessions.Range(func(key, value interface{}) bool {
+               t.Logf("key: %v, value: %v", key, value)
+               return true
+       })
 }
diff --git a/pkg/remoting/loadbalance/loadbalance.go 
b/pkg/remoting/loadbalance/loadbalance.go
index f867793b..5704eb39 100644
--- a/pkg/remoting/loadbalance/loadbalance.go
+++ b/pkg/remoting/loadbalance/loadbalance.go
@@ -37,6 +37,8 @@ func Select(loadBalanceType string, sessions *sync.Map, xid 
string) getty.Sessio
                return RandomLoadBalance(sessions, xid)
        case xidLoadBalance:
                return XidLoadBalance(sessions, xid)
+       case consistentHashLoadBalance:
+               return ConsistentHashLoadBalance(sessions, xid)
        case leastActiveLoadBalance:
                return LeastActiveLoadBalance(sessions, xid)    
        case roundRobinLoadBalance:
diff --git a/pkg/remoting/loadbalance/random_loadbalance_test.go 
b/pkg/remoting/loadbalance/random_loadbalance_test.go
index 5db9c882..e63a74cb 100644
--- a/pkg/remoting/loadbalance/random_loadbalance_test.go
+++ b/pkg/remoting/loadbalance/random_loadbalance_test.go
@@ -23,8 +23,9 @@ import (
        "testing"
 
        "github.com/golang/mock/gomock"
-       "github.com/seata/seata-go/pkg/remoting/mock"
        "github.com/stretchr/testify/assert"
+
+       "github.com/seata/seata-go/pkg/remoting/mock"
 )
 
 func TestRandomLoadBalance_Normal(t *testing.T) {
diff --git a/pkg/remoting/loadbalance/xid_loadbalance_test.go 
b/pkg/remoting/loadbalance/xid_loadbalance_test.go
index d361f338..cd47cdd8 100644
--- a/pkg/remoting/loadbalance/xid_loadbalance_test.go
+++ b/pkg/remoting/loadbalance/xid_loadbalance_test.go
@@ -22,8 +22,9 @@ import (
        "testing"
 
        "github.com/golang/mock/gomock"
-       "github.com/seata/seata-go/pkg/remoting/mock"
        "github.com/stretchr/testify/assert"
+
+       "github.com/seata/seata-go/pkg/remoting/mock"
 )
 
 func TestXidLoadBalance(t *testing.T) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to