This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new bd8384ca Assign a separate lookup table to each group (#482)
bd8384ca is described below
commit bd8384ca5f66838f51a878668ef2a3785a806565
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Jul 1 17:16:30 2024 +0800
Assign a separate lookup table to each group (#482)
---
CHANGES.md | 1 +
pkg/cmdsetup/liaison.go | 5 +---
pkg/node/interface.go | 5 ----
pkg/node/maglev.go | 69 ++++++++++++++++++++++++++++++++++++++++---------
pkg/node/maglev_test.go | 12 ++++-----
5 files changed, 64 insertions(+), 28 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 82ca6e08..408a8b29 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -9,6 +9,7 @@ Release Notes.
- Check unregistered nodes in background.
- Improve sorting performance of stream.
- Add the measure query trace.
+- Assign a separate lookup table to each group in the maglev selector.
### Bugs
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index a7112e94..67cd942a 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -46,10 +46,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
}
pipeline := pub.New(metaSvc)
localPipeline := queue.Local()
- nodeSel, err := node.NewMaglevSelector()
- if err != nil {
- l.Fatal().Err(err).Msg("failed to initiate required node
selector")
- }
+ nodeSel := node.NewMaglevSelector()
nodeRegistry := grpc.NewClusterNodeRegistry(pipeline, nodeSel)
grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc,
nodeRegistry)
profSvc := observability.NewProfService()
diff --git a/pkg/node/interface.go b/pkg/node/interface.go
index 7745c9ad..26ec5cc2 100644
--- a/pkg/node/interface.go
+++ b/pkg/node/interface.go
@@ -19,7 +19,6 @@
package node
import (
- "strconv"
"sync"
"github.com/pkg/errors"
@@ -97,7 +96,3 @@ func (p *pickFirstSelector) Pick(_, _ string, _ uint32)
(string, error) {
}
return p.nodeIDs[0], nil
}
-
-func formatSearchKey(group, name string, shardID uint32) string {
- return group + "/" + name + "#" + strconv.FormatUint(uint64(shardID),
10)
-}
diff --git a/pkg/node/maglev.go b/pkg/node/maglev.go
index deb140f2..fea2c5b2 100644
--- a/pkg/node/maglev.go
+++ b/pkg/node/maglev.go
@@ -18,36 +18,81 @@
package node
import (
+ "sort"
+ "strconv"
+ "sync"
+
"github.com/kkdai/maglev"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
)
+const lookupTableSize = 65537
+
var _ Selector = (*maglevSelector)(nil)
type maglevSelector struct {
- maglev *maglev.Maglev
+ routers sync.Map
+ nodes []string
+ mutex sync.RWMutex
}
func (m *maglevSelector) AddNode(node *databasev1.Node) {
- _ = m.maglev.Add(node.GetMetadata().GetName())
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ for i := range m.nodes {
+ if m.nodes[i] == node.GetMetadata().GetName() {
+ return
+ }
+ }
+ m.nodes = append(m.nodes, node.GetMetadata().GetName())
+ sort.StringSlice(m.nodes).Sort()
+ m.routers.Range(func(_, value any) bool {
+ _ = value.(*maglev.Maglev).Set(m.nodes)
+ return true
+ })
}
func (m *maglevSelector) RemoveNode(node *databasev1.Node) {
- _ = m.maglev.Remove(node.GetMetadata().GetName())
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ for i := range m.nodes {
+ if m.nodes[i] == node.GetMetadata().GetName() {
+ m.nodes = append(m.nodes[:i], m.nodes[i+1:]...)
+ break
+ }
+ }
+ m.routers.Range(func(_, value any) bool {
+ _ = value.(*maglev.Maglev).Set(m.nodes)
+ return true
+ })
}
func (m *maglevSelector) Pick(group, name string, shardID uint32) (string,
error) {
- return m.maglev.Get(formatSearchKey(group, name, shardID))
-}
+ router, ok := m.routers.Load(group)
+ if ok {
+ return router.(*maglev.Maglev).Get(formatSearchKey(name,
shardID))
+ }
+ m.mutex.Lock()
+ defer m.mutex.Unlock()
+ router, ok = m.routers.Load(group)
+ if ok {
+ return router.(*maglev.Maglev).Get(formatSearchKey(name,
shardID))
+ }
-// NewMaglevSelector creates a new backend selector based on Maglev hashing
algorithm.
-func NewMaglevSelector() (Selector, error) {
- alg, err := maglev.NewMaglev(nil, 65537)
+ mTab, err := maglev.NewMaglev(m.nodes, lookupTableSize)
if err != nil {
- return nil, err
+ return "", err
}
- return &maglevSelector{
- maglev: alg,
- }, nil
+ m.routers.Store(group, mTab)
+ return mTab.Get(formatSearchKey(name, shardID))
+}
+
+// NewMaglevSelector creates a new backend selector based on Maglev hashing
algorithm.
+func NewMaglevSelector() Selector {
+ return &maglevSelector{}
+}
+
+func formatSearchKey(name string, shardID uint32) string {
+ return name + "-" + strconv.FormatUint(uint64(shardID), 10)
}
diff --git a/pkg/node/maglev_test.go b/pkg/node/maglev_test.go
index d581dd03..74505a3a 100644
--- a/pkg/node/maglev_test.go
+++ b/pkg/node/maglev_test.go
@@ -34,8 +34,7 @@ const (
)
func TestMaglevSelector(t *testing.T) {
- sel, err := NewMaglevSelector()
- assert.NoError(t, err)
+ sel := NewMaglevSelector()
sel.AddNode(&databasev1.Node{
Metadata: &commonv1.Metadata{
Name: "data-node-1",
@@ -55,8 +54,7 @@ func TestMaglevSelector(t *testing.T) {
}
func TestMaglevSelector_EvenDistribution(t *testing.T) {
- sel, err := NewMaglevSelector()
- assert.NoError(t, err)
+ sel := NewMaglevSelector()
dataNodeNum := 10
for i := 0; i < dataNodeNum; i++ {
sel.AddNode(&databasev1.Node{
@@ -83,8 +81,8 @@ func TestMaglevSelector_EvenDistribution(t *testing.T) {
}
func TestMaglevSelector_DiffNode(t *testing.T) {
- fullSel, _ := NewMaglevSelector()
- brokenSel, _ := NewMaglevSelector()
+ fullSel := NewMaglevSelector()
+ brokenSel := NewMaglevSelector()
dataNodeNum := 10
for i := 0; i < dataNodeNum; i++ {
fullSel.AddNode(&databasev1.Node{
@@ -114,7 +112,7 @@ func TestMaglevSelector_DiffNode(t *testing.T) {
}
func BenchmarkMaglevSelector_Pick(b *testing.B) {
- sel, _ := NewMaglevSelector()
+ sel := NewMaglevSelector()
dataNodeNum := 10
for i := 0; i < dataNodeNum; i++ {
sel.AddNode(&databasev1.Node{