This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch stream-trace
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/stream-trace by this push:
new c98c8a7c Introduce more query tracing and node selector
c98c8a7c is described below
commit c98c8a7cbca3c460644bd427aa2756ebb3396100
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Jul 15 06:00:41 2024 +0000
Introduce more query tracing and node selector
Signed-off-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 3 +
pkg/cmdsetup/liaison.go | 3 +-
pkg/node/interface.go | 3 +
pkg/node/maglev.go | 2 +
pkg/node/round_robin.go | 177 +++++++++++++++++++++
pkg/node/round_robin_test.go | 112 +++++++++++++
.../logical/stream/stream_plan_distributed.go | 29 +++-
7 files changed, 323 insertions(+), 6 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index be310f0e..8ab401c5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -15,6 +15,9 @@ Release Notes.
- Add the measure query trace.
- Assign a separate lookup table to each group in the maglev selector.
- Convert the async local pipeline to a sync pipeline.
+- Add the stream query trace.
+- Add the topN query trace.
+- Introduce the round-robin selector to Liaison Node.
### Bugs
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 67cd942a..460c8544 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -46,7 +46,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
}
pipeline := pub.New(metaSvc)
localPipeline := queue.Local()
- nodeSel := node.NewMaglevSelector()
+ nodeSel := node.NewRoundRobinSelector()
nodeRegistry := grpc.NewClusterNodeRegistry(pipeline, nodeSel)
grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc,
nodeRegistry)
profSvc := observability.NewProfService()
@@ -77,6 +77,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
Version: version.Build(),
Short: "Run as the liaison server",
RunE: func(_ *cobra.Command, _ []string) (err error) {
+ defer nodeSel.Close()
node, err := common.GenerateNode(grpcServer.GetPort(),
httpServer.GetPort())
if err != nil {
return err
diff --git a/pkg/node/interface.go b/pkg/node/interface.go
index 26ec5cc2..1a4f5882 100644
--- a/pkg/node/interface.go
+++ b/pkg/node/interface.go
@@ -39,6 +39,7 @@ type Selector interface {
AddNode(node *databasev1.Node)
RemoveNode(node *databasev1.Node)
Pick(group, name string, shardID uint32) (string, error)
+ Close()
}
// NewPickFirstSelector returns a simple selector that always returns the
first node if exists.
@@ -55,6 +56,8 @@ type pickFirstSelector struct {
mu sync.RWMutex
}
+func (p *pickFirstSelector) Close() {}
+
func (p *pickFirstSelector) AddNode(node *databasev1.Node) {
nodeID := node.GetMetadata().GetName()
p.mu.RLock()
diff --git a/pkg/node/maglev.go b/pkg/node/maglev.go
index fea2c5b2..4ae21740 100644
--- a/pkg/node/maglev.go
+++ b/pkg/node/maglev.go
@@ -37,6 +37,8 @@ type maglevSelector struct {
mutex sync.RWMutex
}
+func (m *maglevSelector) Close() {}
+
func (m *maglevSelector) AddNode(node *databasev1.Node) {
m.mutex.Lock()
defer m.mutex.Unlock()
diff --git a/pkg/node/round_robin.go b/pkg/node/round_robin.go
new file mode 100644
index 00000000..da85c210
--- /dev/null
+++ b/pkg/node/round_robin.go
@@ -0,0 +1,177 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package node
+
+import (
+ "fmt"
+ "slices"
+ "sort"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/pkg/errors"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+const (
+ expiredKeyCleanupInterval = 1 * time.Hour
+ keyTTL = 24 * time.Hour
+)
+
+type roundRobinSelector struct {
+ clock timestamp.Clock
+ closeCh chan struct{}
+ lookupTable sync.Map
+ nodes []string
+ mu sync.RWMutex
+ once sync.Once
+ tMu sync.Mutex
+}
+
+func (r *roundRobinSelector) Close() {
+ close(r.closeCh)
+}
+
+// NewRoundRobinSelector creates a new round-robin selector.
+func NewRoundRobinSelector() Selector {
+ rrs := &roundRobinSelector{
+ nodes: make([]string, 0),
+ clock: timestamp.NewClock(),
+ closeCh: make(chan struct{}),
+ }
+ return rrs
+}
+
+func (r *roundRobinSelector) AddNode(node *databasev1.Node) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ r.nodes = append(r.nodes, node.Metadata.Name)
+ sort.StringSlice(r.nodes).Sort()
+}
+
+func (r *roundRobinSelector) RemoveNode(node *databasev1.Node) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ for i, n := range r.nodes {
+ if n == node.Metadata.Name {
+ r.nodes = append(r.nodes[:i], r.nodes[i+1:]...)
+ break
+ }
+ }
+}
+
+func (r *roundRobinSelector) Pick(group, _ string, shardID uint32) (string,
error) {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ k := key{group: group, shardID: shardID}
+ if len(r.nodes) == 0 {
+ return "", errors.New("no nodes available")
+ }
+ entry, ok := r.lookupTable.Load(k)
+ if ok {
+ return r.selectNode(entry), nil
+ }
+ r.tMu.Lock()
+ defer r.tMu.Unlock()
+ if entry, ok := r.lookupTable.Load(k); ok {
+ return r.selectNode(entry), nil
+ }
+
+ keys := []key{k}
+ r.lookupTable.Range(func(k, _ any) bool {
+ keys = append(keys, k.(key))
+ return true
+ })
+ slices.SortFunc(keys, func(a, b key) int {
+ n := strings.Compare(a.group, b.group)
+ if n != 0 {
+ return n
+ }
+ return int(a.shardID) - int(b.shardID)
+ })
+ for i := range keys {
+ if entry, ok := r.lookupTable.Load(keys[i]); ok {
+ entry.(*tableEntry).index = i
+ } else {
+ r.lookupTable.Store(keys[i], r.newTableEntry(i))
+ }
+ }
+ r.once.Do(r.startCleanupTicker)
+ if entry, ok := r.lookupTable.Load(k); ok {
+ return r.selectNode(entry), nil
+ }
+ panic(fmt.Sprintf("key %v not found", k))
+}
+
+func (r *roundRobinSelector) selectNode(entry any) string {
+ e := entry.(*tableEntry)
+ now := r.clock.Now()
+ e.lastAccess.Store(&now)
+ return r.nodes[e.index%len(r.nodes)]
+}
+
+type key struct {
+ group string
+ shardID uint32
+}
+
+type tableEntry struct {
+ lastAccess *atomic.Pointer[time.Time]
+ index int
+}
+
+func (r *roundRobinSelector) newTableEntry(index int) *tableEntry {
+ p := atomic.Pointer[time.Time]{}
+ now := r.clock.Now()
+ p.Store(&now)
+ return &tableEntry{
+ index: index,
+ lastAccess: &p,
+ }
+}
+
+func (r *roundRobinSelector) cleanupExpiredEntries() {
+ now := r.clock.Now()
+ r.tMu.Lock()
+ defer r.tMu.Unlock()
+
+ r.lookupTable.Range(func(k, value any) bool {
+ e := value.(*tableEntry)
+ if now.Sub(*e.lastAccess.Load()) > keyTTL {
+ r.lookupTable.Delete(k)
+ }
+ return true
+ })
+}
+
+func (r *roundRobinSelector) startCleanupTicker() {
+ ticker := r.clock.Ticker(expiredKeyCleanupInterval)
+ go func() {
+ select {
+ case <-r.closeCh:
+ ticker.Stop()
+ return
+ case <-ticker.C:
+ r.cleanupExpiredEntries()
+ }
+ }()
+}
diff --git a/pkg/node/round_robin_test.go b/pkg/node/round_robin_test.go
new file mode 100644
index 00000000..69c1822a
--- /dev/null
+++ b/pkg/node/round_robin_test.go
@@ -0,0 +1,112 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package node
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+func TestPickEmptySelector(t *testing.T) {
+ selector := NewRoundRobinSelector()
+ _, err := selector.Pick("group1", "", 0)
+ assert.Error(t, err)
+}
+
+func TestPickSingleSelection(t *testing.T) {
+ selector := NewRoundRobinSelector()
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
+ node, err := selector.Pick("group1", "", 0)
+ assert.NoError(t, err)
+ assert.Equal(t, "node1", node)
+}
+
+func TestPickMultipleSelections(t *testing.T) {
+ selector := NewRoundRobinSelector()
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node2"}})
+ // load data
+ _, err := selector.Pick("group1", "", 0)
+ assert.NoError(t, err)
+ _, err = selector.Pick("group1", "", 1)
+ assert.NoError(t, err)
+ node1, err := selector.Pick("group1", "", 0)
+ assert.NoError(t, err)
+ node2, err := selector.Pick("group1", "", 1)
+ assert.NoError(t, err)
+ assert.NotEqual(t, node1, node2, "Different shardIDs in the same group
should not result in the same node")
+}
+
+func TestPickNodeRemoval(t *testing.T) {
+ selector := NewRoundRobinSelector()
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node2"}})
+ selector.RemoveNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
+ node, err := selector.Pick("group1", "", 0)
+ assert.NoError(t, err)
+ assert.Equal(t, "node2", node)
+}
+
+func TestPickConsistentSelectionAfterRemoval(t *testing.T) {
+ selector := NewRoundRobinSelector()
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node2"}})
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node3"}})
+ _, err := selector.Pick("group1", "", 0)
+ assert.NoError(t, err)
+ _, err = selector.Pick("group1", "", 1)
+ assert.NoError(t, err)
+ node, err := selector.Pick("group1", "", 1)
+ assert.NoError(t, err)
+ assert.Equal(t, "node2", node)
+ selector.RemoveNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node2"}})
+ node, err = selector.Pick("group1", "", 1)
+ assert.NoError(t, err)
+ assert.Equal(t, "node3", node)
+}
+
+func TestCleanupExpiredEntries(t *testing.T) {
+ mc := timestamp.NewMockClock()
+ mc.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
+ selector := &roundRobinSelector{
+ nodes: make([]string, 0),
+ clock: mc,
+ }
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node2"}})
+ _, err := selector.Pick("group1", "", 0)
+ assert.NoError(t, err)
+ _, ok := selector.lookupTable.Load(key{group: "group1", shardID: 0})
+ assert.True(t, ok)
+ mc.Add(25 * time.Hour)
+ _, err = selector.Pick("group1", "", 1)
+ assert.NoError(t, err)
+ _, ok = selector.lookupTable.Load(key{group: "group1", shardID: 1})
+ assert.True(t, ok)
+ selector.cleanupExpiredEntries()
+ _, ok = selector.lookupTable.Load(key{group: "group1", shardID: 0})
+ assert.False(t, ok)
+ _, ok = selector.lookupTable.Load(key{group: "group1", shardID: 1})
+ assert.True(t, ok)
+}
diff --git a/pkg/query/logical/stream/stream_plan_distributed.go
b/pkg/query/logical/stream/stream_plan_distributed.go
index 26380542..55a6b5fb 100644
--- a/pkg/query/logical/stream/stream_plan_distributed.go
+++ b/pkg/query/logical/stream/stream_plan_distributed.go
@@ -31,7 +31,9 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/iter/sort"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
@@ -129,14 +131,28 @@ type distributedPlan struct {
func (t *distributedPlan) Close() {}
-func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element,
error) {
+func (t *distributedPlan) Execute(ctx context.Context) (ee
[]*streamv1.Element, err error) {
dctx := executor.FromDistributedExecutionContext(ctx)
- query := proto.Clone(t.queryTemplate).(*streamv1.QueryRequest)
- query.TimeRange = dctx.TimeRange()
+ queryRequest := proto.Clone(t.queryTemplate).(*streamv1.QueryRequest)
+ queryRequest.TimeRange = dctx.TimeRange()
if t.maxElementSize > 0 {
- query.Limit = t.maxElementSize
+ queryRequest.Limit = t.maxElementSize
}
- ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery,
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
+ tracer := query.GetTracer(ctx)
+ var span *query.Span
+ if tracer != nil {
+ span, _ = tracer.StartSpan(ctx, "distributed-client")
+ queryRequest.Trace = true
+ span.Tag("request",
convert.BytesToString(logger.Proto(queryRequest)))
+ defer func() {
+ if err != nil {
+ span.Error(err)
+ } else {
+ span.Stop()
+ }
+ }()
+ }
+ ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery,
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), queryRequest))
if err != nil {
return nil, err
}
@@ -151,6 +167,9 @@ func (t *distributedPlan) Execute(ctx context.Context)
([]*streamv1.Element, err
continue
}
resp := d.(*streamv1.QueryResponse)
+ if span != nil {
+ span.AddSubTrace(resp.Trace)
+ }
see = append(see,
newSortableElements(resp.Elements,
t.sortByTime, t.sortTagSpec))
}