This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch stream-trace
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/stream-trace by this push:
     new c98c8a7c Introduce more query tracing and node selector
c98c8a7c is described below

commit c98c8a7cbca3c460644bd427aa2756ebb3396100
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Jul 15 06:00:41 2024 +0000

    Introduce more query tracing and node selector
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                                         |   3 +
 pkg/cmdsetup/liaison.go                            |   3 +-
 pkg/node/interface.go                              |   3 +
 pkg/node/maglev.go                                 |   2 +
 pkg/node/round_robin.go                            | 177 +++++++++++++++++++++
 pkg/node/round_robin_test.go                       | 112 +++++++++++++
 .../logical/stream/stream_plan_distributed.go      |  29 +++-
 7 files changed, 323 insertions(+), 6 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index be310f0e..8ab401c5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -15,6 +15,9 @@ Release Notes.
 - Add the measure query trace.
 - Assign a separate lookup table to each group in the maglev selector.
 - Convert the async local pipeline to a sync pipeline.
+- Add the stream query trace.
+- Add the topN query trace.
+- Introduce the round-robin selector to Liaison Node.
 
 ### Bugs
 
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 67cd942a..460c8544 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -46,7 +46,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
        }
        pipeline := pub.New(metaSvc)
        localPipeline := queue.Local()
-       nodeSel := node.NewMaglevSelector()
+       nodeSel := node.NewRoundRobinSelector()
        nodeRegistry := grpc.NewClusterNodeRegistry(pipeline, nodeSel)
        grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, 
nodeRegistry)
        profSvc := observability.NewProfService()
@@ -77,6 +77,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
                Version: version.Build(),
                Short:   "Run as the liaison server",
                RunE: func(_ *cobra.Command, _ []string) (err error) {
+                       defer nodeSel.Close()
                        node, err := common.GenerateNode(grpcServer.GetPort(), 
httpServer.GetPort())
                        if err != nil {
                                return err
diff --git a/pkg/node/interface.go b/pkg/node/interface.go
index 26ec5cc2..1a4f5882 100644
--- a/pkg/node/interface.go
+++ b/pkg/node/interface.go
@@ -39,6 +39,7 @@ type Selector interface {
        AddNode(node *databasev1.Node)
        RemoveNode(node *databasev1.Node)
        Pick(group, name string, shardID uint32) (string, error)
+       Close()
 }
 
 // NewPickFirstSelector returns a simple selector that always returns the 
first node if exists.
@@ -55,6 +56,8 @@ type pickFirstSelector struct {
        mu        sync.RWMutex
 }
 
+func (p *pickFirstSelector) Close() {}
+
 func (p *pickFirstSelector) AddNode(node *databasev1.Node) {
        nodeID := node.GetMetadata().GetName()
        p.mu.RLock()
diff --git a/pkg/node/maglev.go b/pkg/node/maglev.go
index fea2c5b2..4ae21740 100644
--- a/pkg/node/maglev.go
+++ b/pkg/node/maglev.go
@@ -37,6 +37,8 @@ type maglevSelector struct {
        mutex   sync.RWMutex
 }
 
+func (m *maglevSelector) Close() {}
+
 func (m *maglevSelector) AddNode(node *databasev1.Node) {
        m.mutex.Lock()
        defer m.mutex.Unlock()
diff --git a/pkg/node/round_robin.go b/pkg/node/round_robin.go
new file mode 100644
index 00000000..da85c210
--- /dev/null
+++ b/pkg/node/round_robin.go
@@ -0,0 +1,177 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package node
+
+import (
+       "fmt"
+       "slices"
+       "sort"
+       "strings"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/pkg/errors"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+const (
+       expiredKeyCleanupInterval = 1 * time.Hour
+       keyTTL                    = 24 * time.Hour
+)
+
+type roundRobinSelector struct {
+       clock       timestamp.Clock
+       closeCh     chan struct{}
+       lookupTable sync.Map
+       nodes       []string
+       mu          sync.RWMutex
+       once        sync.Once
+       tMu         sync.Mutex
+}
+
+func (r *roundRobinSelector) Close() {
+       close(r.closeCh)
+}
+
+// NewRoundRobinSelector creates a new round-robin selector.
+func NewRoundRobinSelector() Selector {
+       rrs := &roundRobinSelector{
+               nodes:   make([]string, 0),
+               clock:   timestamp.NewClock(),
+               closeCh: make(chan struct{}),
+       }
+       return rrs
+}
+
+func (r *roundRobinSelector) AddNode(node *databasev1.Node) {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+       r.nodes = append(r.nodes, node.Metadata.Name)
+       sort.StringSlice(r.nodes).Sort()
+}
+
+func (r *roundRobinSelector) RemoveNode(node *databasev1.Node) {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+       for i, n := range r.nodes {
+               if n == node.Metadata.Name {
+                       r.nodes = append(r.nodes[:i], r.nodes[i+1:]...)
+                       break
+               }
+       }
+}
+
+func (r *roundRobinSelector) Pick(group, _ string, shardID uint32) (string, 
error) {
+       r.mu.RLock()
+       defer r.mu.RUnlock()
+       k := key{group: group, shardID: shardID}
+       if len(r.nodes) == 0 {
+               return "", errors.New("no nodes available")
+       }
+       entry, ok := r.lookupTable.Load(k)
+       if ok {
+               return r.selectNode(entry), nil
+       }
+       r.tMu.Lock()
+       defer r.tMu.Unlock()
+       if entry, ok := r.lookupTable.Load(k); ok {
+               return r.selectNode(entry), nil
+       }
+
+       keys := []key{k}
+       r.lookupTable.Range(func(k, _ any) bool {
+               keys = append(keys, k.(key))
+               return true
+       })
+       slices.SortFunc(keys, func(a, b key) int {
+               n := strings.Compare(a.group, b.group)
+               if n != 0 {
+                       return n
+               }
+               return int(a.shardID) - int(b.shardID)
+       })
+       for i := range keys {
+               if entry, ok := r.lookupTable.Load(keys[i]); ok {
+                       entry.(*tableEntry).index = i
+               } else {
+                       r.lookupTable.Store(keys[i], r.newTableEntry(i))
+               }
+       }
+       r.once.Do(r.startCleanupTicker)
+       if entry, ok := r.lookupTable.Load(k); ok {
+               return r.selectNode(entry), nil
+       }
+       panic(fmt.Sprintf("key %v not found", k))
+}
+
+func (r *roundRobinSelector) selectNode(entry any) string {
+       e := entry.(*tableEntry)
+       now := r.clock.Now()
+       e.lastAccess.Store(&now)
+       return r.nodes[e.index%len(r.nodes)]
+}
+
+type key struct {
+       group   string
+       shardID uint32
+}
+
+type tableEntry struct {
+       lastAccess *atomic.Pointer[time.Time]
+       index      int
+}
+
+func (r *roundRobinSelector) newTableEntry(index int) *tableEntry {
+       p := atomic.Pointer[time.Time]{}
+       now := r.clock.Now()
+       p.Store(&now)
+       return &tableEntry{
+               index:      index,
+               lastAccess: &p,
+       }
+}
+
+func (r *roundRobinSelector) cleanupExpiredEntries() {
+       now := r.clock.Now()
+       r.tMu.Lock()
+       defer r.tMu.Unlock()
+
+       r.lookupTable.Range(func(k, value any) bool {
+               e := value.(*tableEntry)
+               if now.Sub(*e.lastAccess.Load()) > keyTTL {
+                       r.lookupTable.Delete(k)
+               }
+               return true
+       })
+}
+
+func (r *roundRobinSelector) startCleanupTicker() {
+       ticker := r.clock.Ticker(expiredKeyCleanupInterval)
+       go func() {
+               select {
+               case <-r.closeCh:
+                       ticker.Stop()
+                       return
+               case <-ticker.C:
+                       r.cleanupExpiredEntries()
+               }
+       }()
+}
diff --git a/pkg/node/round_robin_test.go b/pkg/node/round_robin_test.go
new file mode 100644
index 00000000..69c1822a
--- /dev/null
+++ b/pkg/node/round_robin_test.go
@@ -0,0 +1,112 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package node
+
+import (
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+func TestPickEmptySelector(t *testing.T) {
+       selector := NewRoundRobinSelector()
+       _, err := selector.Pick("group1", "", 0)
+       assert.Error(t, err)
+}
+
+func TestPickSingleSelection(t *testing.T) {
+       selector := NewRoundRobinSelector()
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node1"}})
+       node, err := selector.Pick("group1", "", 0)
+       assert.NoError(t, err)
+       assert.Equal(t, "node1", node)
+}
+
+func TestPickMultipleSelections(t *testing.T) {
+       selector := NewRoundRobinSelector()
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node1"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node2"}})
+       // load data
+       _, err := selector.Pick("group1", "", 0)
+       assert.NoError(t, err)
+       _, err = selector.Pick("group1", "", 1)
+       assert.NoError(t, err)
+       node1, err := selector.Pick("group1", "", 0)
+       assert.NoError(t, err)
+       node2, err := selector.Pick("group1", "", 1)
+       assert.NoError(t, err)
+       assert.NotEqual(t, node1, node2, "Different shardIDs in the same group 
should not result in the same node")
+}
+
+func TestPickNodeRemoval(t *testing.T) {
+       selector := NewRoundRobinSelector()
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node1"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node2"}})
+       selector.RemoveNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node1"}})
+       node, err := selector.Pick("group1", "", 0)
+       assert.NoError(t, err)
+       assert.Equal(t, "node2", node)
+}
+
+func TestPickConsistentSelectionAfterRemoval(t *testing.T) {
+       selector := NewRoundRobinSelector()
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node1"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node2"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node3"}})
+       _, err := selector.Pick("group1", "", 0)
+       assert.NoError(t, err)
+       _, err = selector.Pick("group1", "", 1)
+       assert.NoError(t, err)
+       node, err := selector.Pick("group1", "", 1)
+       assert.NoError(t, err)
+       assert.Equal(t, "node2", node)
+       selector.RemoveNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node2"}})
+       node, err = selector.Pick("group1", "", 1)
+       assert.NoError(t, err)
+       assert.Equal(t, "node3", node)
+}
+
+func TestCleanupExpiredEntries(t *testing.T) {
+       mc := timestamp.NewMockClock()
+       mc.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
+       selector := &roundRobinSelector{
+               nodes: make([]string, 0),
+               clock: mc,
+       }
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node1"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node2"}})
+       _, err := selector.Pick("group1", "", 0)
+       assert.NoError(t, err)
+       _, ok := selector.lookupTable.Load(key{group: "group1", shardID: 0})
+       assert.True(t, ok)
+       mc.Add(25 * time.Hour)
+       _, err = selector.Pick("group1", "", 1)
+       assert.NoError(t, err)
+       _, ok = selector.lookupTable.Load(key{group: "group1", shardID: 1})
+       assert.True(t, ok)
+       selector.cleanupExpiredEntries()
+       _, ok = selector.lookupTable.Load(key{group: "group1", shardID: 0})
+       assert.False(t, ok)
+       _, ok = selector.lookupTable.Load(key{group: "group1", shardID: 1})
+       assert.True(t, ok)
+}
diff --git a/pkg/query/logical/stream/stream_plan_distributed.go 
b/pkg/query/logical/stream/stream_plan_distributed.go
index 26380542..55a6b5fb 100644
--- a/pkg/query/logical/stream/stream_plan_distributed.go
+++ b/pkg/query/logical/stream/stream_plan_distributed.go
@@ -31,7 +31,9 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/iter/sort"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
@@ -129,14 +131,28 @@ type distributedPlan struct {
 
 func (t *distributedPlan) Close() {}
 
-func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element, 
error) {
+func (t *distributedPlan) Execute(ctx context.Context) (ee 
[]*streamv1.Element, err error) {
        dctx := executor.FromDistributedExecutionContext(ctx)
-       query := proto.Clone(t.queryTemplate).(*streamv1.QueryRequest)
-       query.TimeRange = dctx.TimeRange()
+       queryRequest := proto.Clone(t.queryTemplate).(*streamv1.QueryRequest)
+       queryRequest.TimeRange = dctx.TimeRange()
        if t.maxElementSize > 0 {
-               query.Limit = t.maxElementSize
+               queryRequest.Limit = t.maxElementSize
        }
-       ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
+       tracer := query.GetTracer(ctx)
+       var span *query.Span
+       if tracer != nil {
+               span, _ = tracer.StartSpan(ctx, "distributed-client")
+               queryRequest.Trace = true
+               span.Tag("request", 
convert.BytesToString(logger.Proto(queryRequest)))
+               defer func() {
+                       if err != nil {
+                               span.Error(err)
+                       } else {
+                               span.Stop()
+                       }
+               }()
+       }
+       ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), queryRequest))
        if err != nil {
                return nil, err
        }
@@ -151,6 +167,9 @@ func (t *distributedPlan) Execute(ctx context.Context) 
([]*streamv1.Element, err
                                continue
                        }
                        resp := d.(*streamv1.QueryResponse)
+                       if span != nil {
+                               span.AddSubTrace(resp.Trace)
+                       }
                        see = append(see,
                                newSortableElements(resp.Elements, 
t.sortByTime, t.sortTagSpec))
                }

Reply via email to