This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 285db188 Introduce more query tracing and node selector (#486)
285db188 is described below

commit 285db188e633c6b95b8c5c354e043db79658c147
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Jul 15 14:24:24 2024 +0800

    Introduce more query tracing and node selector (#486)
    
    * Enable tracing stream
    
    * Introduce more query tracing and node selector
    
    Signed-off-by: Gao Hongtao <[email protected]>
    
    ---------
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 CHANGES.md                                         |   3 +
 api/proto/banyandb/measure/v1/topn.proto           |   5 +
 banyand/dquery/stream.go                           |  29 +++-
 banyand/liaison/grpc/measure.go                    |  23 ++-
 banyand/liaison/grpc/stream.go                     |  24 ++-
 banyand/measure/trace.go                           |   1 +
 banyand/query/processor.go                         |  26 ++-
 banyand/query/processor_topn.go                    |  52 +++++-
 banyand/stream/benchmark_test.go                   |  10 +-
 banyand/stream/query.go                            |  54 ++++---
 banyand/stream/query_test.go                       |   3 +-
 banyand/{measure => stream}/trace.go               |   6 +-
 docs/api-reference.md                              |   2 +
 pkg/cmdsetup/liaison.go                            |   3 +-
 pkg/node/interface.go                              |   3 +
 pkg/node/maglev.go                                 |   2 +
 pkg/node/round_robin.go                            | 177 +++++++++++++++++++++
 pkg/node/round_robin_test.go                       | 112 +++++++++++++
 pkg/pb/v1/metadata.go                              |   4 +-
 .../logical/stream/stream_plan_distributed.go      |  29 +++-
 .../logical/stream/stream_plan_indexscan_local.go  |   8 +-
 21 files changed, 522 insertions(+), 54 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index be310f0e..8ab401c5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -15,6 +15,9 @@ Release Notes.
 - Add the measure query trace.
 - Assign a separate lookup table to each group in the maglev selector.
 - Convert the async local pipeline to a sync pipeline.
+- Add the stream query trace.
+- Add the topN query trace.
+- Introduce the round-robin selector to Liaison Node.
 
 ### Bugs
 
diff --git a/api/proto/banyandb/measure/v1/topn.proto 
b/api/proto/banyandb/measure/v1/topn.proto
index f6366b1d..1b349bc6 100644
--- a/api/proto/banyandb/measure/v1/topn.proto
+++ b/api/proto/banyandb/measure/v1/topn.proto
@@ -19,6 +19,7 @@ syntax = "proto3";
 
 package banyandb.measure.v1;
 
+import "banyandb/common/v1/trace.proto";
 import "banyandb/model/v1/common.proto";
 import "banyandb/model/v1/query.proto";
 import "google/protobuf/timestamp.proto";
@@ -44,6 +45,8 @@ message TopNResponse {
   // lists contain a series topN lists ranked by timestamp
   // if agg_func in query request is specified, lists' size should be one.
   repeated TopNList lists = 1;
+  // trace contains the trace information of the query when trace is enabled
+  common.v1.Trace trace = 2;
 }
 
 // TopNRequest is the request contract for query.
@@ -63,4 +66,6 @@ message TopNRequest {
   repeated model.v1.Condition conditions = 6;
   // field_value_sort indicates how to sort fields
   model.v1.Sort field_value_sort = 7;
+  // trace is used to enable trace for the query
+  bool trace = 8;
 }
diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go
index 5cfacdbd..8931ebb5 100644
--- a/banyand/dquery/stream.go
+++ b/banyand/dquery/stream.go
@@ -19,14 +19,17 @@ package dquery
 
 import (
        "context"
+       "errors"
        "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/banyand/stream"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/query"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        logical_stream 
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
 )
@@ -38,7 +41,8 @@ type streamQueryProcessor struct {
 }
 
 func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
-       now := time.Now().UnixNano()
+       n := time.Now()
+       now := n.UnixNano()
        queryCriteria, ok := message.Data().(*streamv1.QueryRequest)
        if !ok {
                resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("invalid event data type"))
@@ -76,9 +80,30 @@ func (p *streamQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
        if p.log.Debug().Enabled() {
                p.log.Debug().Str("plan", plan.String()).Msg("query plan")
        }
+       ctx := context.Background()
+       var tracer *query.Tracer
+       var span *query.Span
+       if queryCriteria.Trace {
+               tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
+               span, ctx = tracer.StartSpan(ctx, "distributed-%s", 
p.queryService.nodeID)
+               span.Tag("plan", plan.String())
+               defer func() {
+                       data := resp.Data()
+                       switch d := data.(type) {
+                       case *streamv1.QueryResponse:
+                               d.Trace = tracer.ToProto()
+                       case common.Error:
+                               span.Error(errors.New(d.Msg()))
+                               resp = bus.NewMessage(bus.MessageID(now), 
&measurev1.QueryResponse{Trace: tracer.ToProto()})
+                       default:
+                               panic("unexpected data type")
+                       }
+                       span.Stop()
+               }()
+       }
        se := plan.(executor.StreamExecutable)
        defer se.Close()
-       entities, err := 
se.Execute(executor.WithDistributedExecutionContext(context.Background(), 
&distributedContext{
+       entities, err := 
se.Execute(executor.WithDistributedExecutionContext(ctx, &distributedContext{
                Broadcaster: p.broadcaster,
                timeRange:   queryCriteria.TimeRange,
        }))
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 175c687c..6bedb050 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -188,12 +188,27 @@ func (ms *measureService) Query(_ context.Context, req 
*measurev1.QueryRequest)
        return nil, nil
 }
 
-func (ms *measureService) TopN(_ context.Context, topNRequest 
*measurev1.TopNRequest) (*measurev1.TopNResponse, error) {
-       if err := timestamp.CheckTimeRange(topNRequest.GetTimeRange()); err != 
nil {
+func (ms *measureService) TopN(_ context.Context, topNRequest 
*measurev1.TopNRequest) (resp *measurev1.TopNResponse, err error) {
+       if err = timestamp.CheckTimeRange(topNRequest.GetTimeRange()); err != 
nil {
                return nil, status.Errorf(codes.InvalidArgument, "%v is invalid 
:%s", topNRequest.GetTimeRange(), err)
        }
-
-       message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), 
topNRequest)
+       now := time.Now()
+       if topNRequest.Trace {
+               ctx := context.TODO()
+               tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
+               span, _ := tracer.StartSpan(ctx, "topn-grpc")
+               span.Tag("request", 
convert.BytesToString(logger.Proto(topNRequest)))
+               defer func() {
+                       if err != nil {
+                               span.Error(err)
+                       } else {
+                               span.AddSubTrace(resp.Trace)
+                               resp.Trace = tracer.ToProto()
+                       }
+                       span.Stop()
+               }()
+       }
+       message := bus.NewMessage(bus.MessageID(now.UnixNano()), topNRequest)
        feat, errQuery := ms.broadcaster.Publish(data.TopicTopNQuery, message)
        if errQuery != nil {
                return nil, errQuery
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 0a8946cf..7ee22256 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -34,8 +34,10 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/accesslog"
        "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -139,15 +141,31 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
 
 var emptyStreamQueryResponse = &streamv1.QueryResponse{Elements: 
make([]*streamv1.Element, 0)}
 
-func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest) 
(*streamv1.QueryResponse, error) {
+func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest) 
(resp *streamv1.QueryResponse, err error) {
        timeRange := req.GetTimeRange()
        if timeRange == nil {
                req.TimeRange = timestamp.DefaultTimeRange
        }
-       if err := timestamp.CheckTimeRange(req.GetTimeRange()); err != nil {
+       if err = timestamp.CheckTimeRange(req.GetTimeRange()); err != nil {
                return nil, status.Errorf(codes.InvalidArgument, "%v is invalid 
:%s", req.GetTimeRange(), err)
        }
-       message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), req)
+       now := time.Now()
+       if req.Trace {
+               ctx := context.TODO()
+               tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
+               span, _ := tracer.StartSpan(ctx, "stream-grpc")
+               span.Tag("request", convert.BytesToString(logger.Proto(req)))
+               defer func() {
+                       if err != nil {
+                               span.Error(err)
+                       } else {
+                               span.AddSubTrace(resp.Trace)
+                               resp.Trace = tracer.ToProto()
+                       }
+                       span.Stop()
+               }()
+       }
+       message := bus.NewMessage(bus.MessageID(now.UnixNano()), req)
        feat, errQuery := s.broadcaster.Publish(data.TopicStreamQuery, message)
        if errQuery != nil {
                if errors.Is(errQuery, io.EOF) {
diff --git a/banyand/measure/trace.go b/banyand/measure/trace.go
index ccf51f3a..42d226cd 100644
--- a/banyand/measure/trace.go
+++ b/banyand/measure/trace.go
@@ -59,6 +59,7 @@ func startBlockScanSpan(ctx context.Context, sids int, parts 
[]*part, qr *queryR
        span, _ := tracer.StartSpan(ctx, "scan-blocks")
        span.Tag("series_num", fmt.Sprintf("%d", sids))
        span.Tag("part_header", partMetadataHeader)
+       span.Tag("part_num", fmt.Sprintf("%d", len(parts)))
        for i := range parts {
                span.Tag(fmt.Sprintf("part_%d_%s", parts[i].partMetadata.ID, 
parts[i].path),
                        parts[i].partMetadata.String())
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 7eb49d07..f1464e2c 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -71,7 +71,8 @@ type streamQueryProcessor struct {
 }
 
 func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
-       now := time.Now().UnixNano()
+       n := time.Now()
+       now := n.UnixNano()
        queryCriteria, ok := message.Data().(*streamv1.QueryRequest)
        if !ok {
                resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("invalid event data type"))
@@ -115,9 +116,30 @@ func (p *streamQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
        if p.log.Debug().Enabled() {
                p.log.Debug().Str("plan", plan.String()).Msg("query plan")
        }
+       ctx := context.Background()
+       var tracer *query.Tracer
+       var span *query.Span
+       if queryCriteria.Trace {
+               tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
+               span, ctx = tracer.StartSpan(ctx, "data-%s", 
p.queryService.nodeID)
+               span.Tag("plan", plan.String())
+               defer func() {
+                       data := resp.Data()
+                       switch d := data.(type) {
+                       case *streamv1.QueryResponse:
+                               d.Trace = tracer.ToProto()
+                       case common.Error:
+                               span.Error(errors.New(d.Msg()))
+                               resp = bus.NewMessage(bus.MessageID(now), 
&measurev1.QueryResponse{Trace: tracer.ToProto()})
+                       default:
+                               panic("unexpected data type")
+                       }
+                       span.Stop()
+               }()
+       }
        se := plan.(executor.StreamExecutable)
        defer se.Close()
-       entities, err := 
se.Execute(executor.WithStreamExecutionContext(context.Background(), ec))
+       entities, err := se.Execute(executor.WithStreamExecutionContext(ctx, 
ec))
        if err != nil {
                p.log.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to execute the query plan")
                resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("execute the query plan for stream %s: %v", meta.GetName(), 
err))
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index d734e18b..7ebb1d48 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -20,6 +20,8 @@ package query
 import (
        "container/heap"
        "context"
+       "errors"
+       "fmt"
        "slices"
        "time"
 
@@ -34,6 +36,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/flow"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query"
        "github.com/apache/skywalking-banyandb/pkg/query/aggregation"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        logical_measure 
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
@@ -46,7 +49,8 @@ type topNQueryProcessor struct {
 
 func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
        request, ok := message.Data().(*measurev1.TopNRequest)
-       now := time.Now().UnixNano()
+       n := time.Now()
+       now := n.UnixNano()
        if !ok {
                t.log.Warn().Msg("invalid event data type")
                return
@@ -114,8 +118,28 @@ func (t *topNQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
        if e := ml.Debug(); e.Enabled() {
                e.Str("plan", plan.String()).Msg("topn plan")
        }
-
-       mIterator, err := 
plan.(executor.MeasureExecutable).Execute(executor.WithMeasureExecutionContext(context.Background(),
 topNResultMeasure))
+       ctx := context.Background()
+       var tracer *query.Tracer
+       var span *query.Span
+       if request.Trace {
+               tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
+               span, ctx = tracer.StartSpan(ctx, "data-%s", 
t.queryService.nodeID)
+               span.Tag("plan", plan.String())
+               defer func() {
+                       data := resp.Data()
+                       switch d := data.(type) {
+                       case *measurev1.TopNResponse:
+                               d.Trace = tracer.ToProto()
+                       case common.Error:
+                               span.Error(errors.New(d.Msg()))
+                               resp = bus.NewMessage(bus.MessageID(now), 
&measurev1.QueryResponse{Trace: tracer.ToProto()})
+                       default:
+                               panic("unexpected data type")
+                       }
+                       span.Stop()
+               }()
+       }
+       mIterator, err := 
plan.(executor.MeasureExecutable).Execute(executor.WithMeasureExecutionContext(ctx,
 topNResultMeasure))
        if err != nil {
                ml.Error().Err(err).RawJSON("req", 
logger.Proto(request)).Msg("fail to close the topn plan")
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to execute the topn plan for measure %s: %v", topNMetadata.GetName(), err))
@@ -128,12 +152,24 @@ func (t *topNQueryProcessor) Rev(message bus.Message) 
(resp bus.Message) {
        }()
 
        result := make([]*measurev1.DataPoint, 0)
-       for mIterator.Next() {
-               current := mIterator.Current()
-               if len(current) > 0 {
-                       result = append(result, current[0])
+       func() {
+               var r int
+               if tracer != nil {
+                       iterSpan, _ := tracer.StartSpan(ctx, "iterator")
+                       defer func() {
+                               iterSpan.Tag("rounds", fmt.Sprintf("%d", r))
+                               iterSpan.Tag("size", fmt.Sprintf("%d", 
len(result)))
+                               iterSpan.Stop()
+                       }()
                }
-       }
+               for mIterator.Next() {
+                       r++
+                       current := mIterator.Current()
+                       if len(current) > 0 {
+                               result = append(result, current[0])
+                       }
+               }
+       }()
 
        resp = bus.NewMessage(bus.MessageID(now), toTopNResponse(result))
        return
diff --git a/banyand/stream/benchmark_test.go b/banyand/stream/benchmark_test.go
index 823116bc..f2fb7c34 100644
--- a/banyand/stream/benchmark_test.go
+++ b/banyand/stream/benchmark_test.go
@@ -344,6 +344,7 @@ func generateStreamQueryOptions(p parameter, index 
mockIndex) pbv1.StreamQueryOp
 
 func BenchmarkFilter(b *testing.B) {
        b.ReportAllocs()
+       ctx := context.TODO()
        for _, p := range pList {
                esList, docsList, idx := generateData(p)
                db := write(b, p, esList, docsList)
@@ -351,24 +352,25 @@ func BenchmarkFilter(b *testing.B) {
                sqo := generateStreamQueryOptions(p, idx)
                sqo.Order = nil
                b.Run("filter-"+p.scenario, func(b *testing.B) {
-                       res, err := s.Query(context.TODO(), sqo)
+                       res, err := s.Query(ctx, sqo)
                        require.NoError(b, err)
-                       logicalstream.BuildElementsFromStreamResult(res)
+                       logicalstream.BuildElementsFromStreamResult(ctx, res)
                })
        }
 }
 
 func BenchmarkSort(b *testing.B) {
        b.ReportAllocs()
+       ctx := context.TODO()
        for _, p := range pList {
                esList, docsList, idx := generateData(p)
                db := write(b, p, esList, docsList)
                s := generateStream(db)
                sqo := generateStreamQueryOptions(p, idx)
                b.Run("sort-"+p.scenario, func(b *testing.B) {
-                       res, err := s.Query(context.TODO(), sqo)
+                       res, err := s.Query(ctx, sqo)
                        require.NoError(b, err)
-                       logicalstream.BuildElementsFromStreamResult(res)
+                       logicalstream.BuildElementsFromStreamResult(ctx, res)
                })
        }
 }
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index 8bd1f105..ef7925b9 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -36,6 +36,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/partition"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
 
@@ -136,24 +137,24 @@ type queryResult struct {
        asc              bool
 }
 
-func (qr *queryResult) Pull() *pbv1.StreamResult {
+func (qr *queryResult) Pull(ctx context.Context) *pbv1.StreamResult {
        if qr.sortingIter == nil {
                qo := qr.qo
                sort.Slice(qo.sortedSids, func(i, j int) bool { return 
qo.sortedSids[i] < qo.sortedSids[j] })
-               return qr.load(qo)
+               return qr.load(ctx, qo)
        }
        if !qr.loaded {
                qr.elementIDsSorted = make([]uint64, 0, qr.qo.MaxElementSize)
-               return qr.loadSortingData()
+               return qr.loadSortingData(ctx)
        }
        if v := qr.nextValue(); v != nil {
                return v
        }
        qr.loaded = false
-       return qr.loadSortingData()
+       return qr.loadSortingData(ctx)
 }
 
-func (qr *queryResult) scanParts(qo queryOptions) error {
+func (qr *queryResult) scanParts(ctx context.Context, qo queryOptions) error {
        var parts []*part
        var n int
        for i := range qr.tabWrappers {
@@ -170,6 +171,8 @@ func (qr *queryResult) scanParts(qo queryOptions) error {
        }
        bma := generateBlockMetadataArray()
        defer releaseBlockMetadataArray(bma)
+       defFn := startBlockScanSpan(ctx, len(qo.sortedSids), parts, qr)
+       defer defFn()
        // TODO: cache tstIter
        var ti tstIter
        defer ti.reset()
@@ -190,9 +193,9 @@ func (qr *queryResult) scanParts(qo queryOptions) error {
        return nil
 }
 
-func (qr *queryResult) load(qo queryOptions) *pbv1.StreamResult {
+func (qr *queryResult) load(ctx context.Context, qo queryOptions) 
*pbv1.StreamResult {
        if !qr.loaded {
-               if err := qr.scanParts(qo); err != nil {
+               if err := qr.scanParts(ctx, qo); err != nil {
                        return &pbv1.StreamResult{
                                Error: err,
                        }
@@ -293,16 +296,34 @@ func (qr *queryResult) nextValue() *pbv1.StreamResult {
        return qr.mergeByTimestamp()
 }
 
-func (qr *queryResult) loadSortingData() *pbv1.StreamResult {
+func (qr *queryResult) loadSortingData(ctx context.Context) *pbv1.StreamResult 
{
        var qo queryOptions
        qo.StreamQueryOptions = qr.qo.StreamQueryOptions
-       if qr.qo.elementFilter != nil {
-               qo.elementFilter = roaring.NewPostingList()
-       }
+       qo.elementFilter = roaring.NewPostingList()
        qo.seriesToEntity = qr.qo.seriesToEntity
        qr.elementIDsSorted = qr.elementIDsSorted[:0]
-       for count := 1; qr.sortingIter.Next(); count++ {
+       count, searchedSize := 1, 0
+       tracer := query.GetTracer(ctx)
+       if tracer != nil {
+               span, _ := tracer.StartSpan(ctx, "load-sorting-data")
+               span.Tagf("max_element_size", "%d", qo.MaxElementSize)
+               if qr.qo.elementFilter != nil {
+                       span.Tag("filter_size", fmt.Sprintf("%d", 
qr.qo.elementFilter.Len()))
+               }
+               defer func() {
+                       span.Tagf("searched_size", "%d", searchedSize)
+                       span.Tagf("count", "%d", count)
+                       span.Stop()
+               }()
+       }
+       for ; qr.sortingIter.Next(); count++ {
+               searchedSize++
                val := qr.sortingIter.Val()
+               if qr.qo.elementFilter != nil && 
!qr.qo.elementFilter.Contains(val.DocID) {
+                       count--
+                       continue
+               }
+               qo.elementFilter.Insert(val.DocID)
                if val.Timestamp > qo.maxTimestamp {
                        qo.maxTimestamp = val.Timestamp
                }
@@ -310,9 +331,6 @@ func (qr *queryResult) loadSortingData() *pbv1.StreamResult 
{
                        qo.minTimestamp = val.Timestamp
                }
                qr.elementIDsSorted = append(qr.elementIDsSorted, val.DocID)
-               if qo.elementFilter != nil {
-                       qo.elementFilter.Insert(val.DocID)
-               }
 
                // Insertion sort
                insertPos, found := -1, false
@@ -338,10 +356,10 @@ func (qr *queryResult) loadSortingData() 
*pbv1.StreamResult {
                        break
                }
        }
-       if qo.elementFilter != nil {
-               _ = qo.elementFilter.Intersect(qr.qo.elementFilter)
+       if qo.elementFilter.IsEmpty() {
+               return nil
        }
-       return qr.load(qo)
+       return qr.load(ctx, qo)
 }
 
 func (qr *queryResult) releaseParts() {
diff --git a/banyand/stream/query_test.go b/banyand/stream/query_test.go
index dcfb6d5c..8cc5236d 100644
--- a/banyand/stream/query_test.go
+++ b/banyand/stream/query_test.go
@@ -315,8 +315,9 @@ func TestQueryResult(t *testing.T) {
                                        result.asc = tt.ascTS
                                }
                                var got []pbv1.StreamResult
+                               ctx := context.Background()
                                for {
-                                       r := result.Pull()
+                                       r := result.Pull(ctx)
                                        if r == nil {
                                                break
                                        }
diff --git a/banyand/measure/trace.go b/banyand/stream/trace.go
similarity index 93%
copy from banyand/measure/trace.go
copy to banyand/stream/trace.go
index ccf51f3a..ed428a67 100644
--- a/banyand/measure/trace.go
+++ b/banyand/stream/trace.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package measure
+package stream
 
 import (
        "context"
@@ -57,8 +57,12 @@ func startBlockScanSpan(ctx context.Context, sids int, parts 
[]*part, qr *queryR
        }
 
        span, _ := tracer.StartSpan(ctx, "scan-blocks")
+       if qr.qo.elementFilter != nil {
+               span.Tag("filter_size", fmt.Sprintf("%d", 
qr.qo.elementFilter.Len()))
+       }
        span.Tag("series_num", fmt.Sprintf("%d", sids))
        span.Tag("part_header", partMetadataHeader)
+       span.Tag("part_num", fmt.Sprintf("%d", len(parts)))
        for i := range parts {
                span.Tag(fmt.Sprintf("part_%d_%s", parts[i].partMetadata.ID, 
parts[i].path),
                        parts[i].partMetadata.String())
diff --git a/docs/api-reference.md b/docs/api-reference.md
index b837540d..2495c817 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -2561,6 +2561,7 @@ TopNRequest is the request contract for query.
 | agg | 
[banyandb.model.v1.AggregationFunction](#banyandb-model-v1-AggregationFunction) 
|  | agg aggregates lists grouped by field names in the time_range TODO 
validate enum defined_only |
 | conditions | [banyandb.model.v1.Condition](#banyandb-model-v1-Condition) | 
repeated | criteria select counters. Only equals are acceptable. |
 | field_value_sort | [banyandb.model.v1.Sort](#banyandb-model-v1-Sort) |  | 
field_value_sort indicates how to sort fields |
+| trace | [bool](#bool) |  | trace is used to enable trace for the query |
 
 
 
@@ -2576,6 +2577,7 @@ TopNResponse is the response for a query to the Query 
module.
 | Field | Type | Label | Description |
 | ----- | ---- | ----- | ----------- |
 | lists | [TopNList](#banyandb-measure-v1-TopNList) | repeated | lists contain 
a series topN lists ranked by timestamp if agg_func in query request is 
specified, lists&#39; size should be one. |
+| trace | [banyandb.common.v1.Trace](#banyandb-common-v1-Trace) |  | trace 
contains the trace information of the query when trace is enabled |
 
 
 
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 67cd942a..460c8544 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -46,7 +46,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
        }
        pipeline := pub.New(metaSvc)
        localPipeline := queue.Local()
-       nodeSel := node.NewMaglevSelector()
+       nodeSel := node.NewRoundRobinSelector()
        nodeRegistry := grpc.NewClusterNodeRegistry(pipeline, nodeSel)
        grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, 
nodeRegistry)
        profSvc := observability.NewProfService()
@@ -77,6 +77,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
                Version: version.Build(),
                Short:   "Run as the liaison server",
                RunE: func(_ *cobra.Command, _ []string) (err error) {
+                       defer nodeSel.Close()
                        node, err := common.GenerateNode(grpcServer.GetPort(), 
httpServer.GetPort())
                        if err != nil {
                                return err
diff --git a/pkg/node/interface.go b/pkg/node/interface.go
index 26ec5cc2..1a4f5882 100644
--- a/pkg/node/interface.go
+++ b/pkg/node/interface.go
@@ -39,6 +39,7 @@ type Selector interface {
        AddNode(node *databasev1.Node)
        RemoveNode(node *databasev1.Node)
        Pick(group, name string, shardID uint32) (string, error)
+       Close()
 }
 
 // NewPickFirstSelector returns a simple selector that always returns the 
first node if exists.
@@ -55,6 +56,8 @@ type pickFirstSelector struct {
        mu        sync.RWMutex
 }
 
+func (p *pickFirstSelector) Close() {}
+
 func (p *pickFirstSelector) AddNode(node *databasev1.Node) {
        nodeID := node.GetMetadata().GetName()
        p.mu.RLock()
diff --git a/pkg/node/maglev.go b/pkg/node/maglev.go
index fea2c5b2..4ae21740 100644
--- a/pkg/node/maglev.go
+++ b/pkg/node/maglev.go
@@ -37,6 +37,8 @@ type maglevSelector struct {
        mutex   sync.RWMutex
 }
 
+func (m *maglevSelector) Close() {}
+
 func (m *maglevSelector) AddNode(node *databasev1.Node) {
        m.mutex.Lock()
        defer m.mutex.Unlock()
diff --git a/pkg/node/round_robin.go b/pkg/node/round_robin.go
new file mode 100644
index 00000000..da85c210
--- /dev/null
+++ b/pkg/node/round_robin.go
@@ -0,0 +1,177 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package node
+
+import (
+       "fmt"
+       "slices"
+       "sort"
+       "strings"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/pkg/errors"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+const (
+       expiredKeyCleanupInterval = 1 * time.Hour
+       keyTTL                    = 24 * time.Hour
+)
+
+type roundRobinSelector struct {
+       clock       timestamp.Clock
+       closeCh     chan struct{}
+       lookupTable sync.Map
+       nodes       []string
+       mu          sync.RWMutex
+       once        sync.Once
+       tMu         sync.Mutex
+}
+
+func (r *roundRobinSelector) Close() {
+       close(r.closeCh)
+}
+
+// NewRoundRobinSelector creates a new round-robin selector.
+func NewRoundRobinSelector() Selector {
+       rrs := &roundRobinSelector{
+               nodes:   make([]string, 0),
+               clock:   timestamp.NewClock(),
+               closeCh: make(chan struct{}),
+       }
+       return rrs
+}
+
+func (r *roundRobinSelector) AddNode(node *databasev1.Node) {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+       r.nodes = append(r.nodes, node.Metadata.Name)
+       sort.StringSlice(r.nodes).Sort()
+}
+
+func (r *roundRobinSelector) RemoveNode(node *databasev1.Node) {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+       for i, n := range r.nodes {
+               if n == node.Metadata.Name {
+                       r.nodes = append(r.nodes[:i], r.nodes[i+1:]...)
+                       break
+               }
+       }
+}
+
+func (r *roundRobinSelector) Pick(group, _ string, shardID uint32) (string, 
error) {
+       r.mu.RLock()
+       defer r.mu.RUnlock()
+       k := key{group: group, shardID: shardID}
+       if len(r.nodes) == 0 {
+               return "", errors.New("no nodes available")
+       }
+       entry, ok := r.lookupTable.Load(k)
+       if ok {
+               return r.selectNode(entry), nil
+       }
+       r.tMu.Lock()
+       defer r.tMu.Unlock()
+       if entry, ok := r.lookupTable.Load(k); ok {
+               return r.selectNode(entry), nil
+       }
+
+       keys := []key{k}
+       r.lookupTable.Range(func(k, _ any) bool {
+               keys = append(keys, k.(key))
+               return true
+       })
+       slices.SortFunc(keys, func(a, b key) int {
+               n := strings.Compare(a.group, b.group)
+               if n != 0 {
+                       return n
+               }
+               return int(a.shardID) - int(b.shardID)
+       })
+       for i := range keys {
+               if entry, ok := r.lookupTable.Load(keys[i]); ok {
+                       entry.(*tableEntry).index = i
+               } else {
+                       r.lookupTable.Store(keys[i], r.newTableEntry(i))
+               }
+       }
+       r.once.Do(r.startCleanupTicker)
+       if entry, ok := r.lookupTable.Load(k); ok {
+               return r.selectNode(entry), nil
+       }
+       panic(fmt.Sprintf("key %v not found", k))
+}
+
+func (r *roundRobinSelector) selectNode(entry any) string {
+       e := entry.(*tableEntry)
+       now := r.clock.Now()
+       e.lastAccess.Store(&now)
+       return r.nodes[e.index%len(r.nodes)]
+}
+
+type key struct {
+       group   string
+       shardID uint32
+}
+
+type tableEntry struct {
+       lastAccess *atomic.Pointer[time.Time]
+       index      int
+}
+
+func (r *roundRobinSelector) newTableEntry(index int) *tableEntry {
+       p := atomic.Pointer[time.Time]{}
+       now := r.clock.Now()
+       p.Store(&now)
+       return &tableEntry{
+               index:      index,
+               lastAccess: &p,
+       }
+}
+
+func (r *roundRobinSelector) cleanupExpiredEntries() {
+       now := r.clock.Now()
+       r.tMu.Lock()
+       defer r.tMu.Unlock()
+
+       r.lookupTable.Range(func(k, value any) bool {
+               e := value.(*tableEntry)
+               if now.Sub(*e.lastAccess.Load()) > keyTTL {
+                       r.lookupTable.Delete(k)
+               }
+               return true
+       })
+}
+
+func (r *roundRobinSelector) startCleanupTicker() {
+       ticker := r.clock.Ticker(expiredKeyCleanupInterval)
+       go func() {
+               select {
+               case <-r.closeCh:
+                       ticker.Stop()
+                       return
+               case <-ticker.C:
+                       r.cleanupExpiredEntries()
+               }
+       }()
+}
diff --git a/pkg/node/round_robin_test.go b/pkg/node/round_robin_test.go
new file mode 100644
index 00000000..69c1822a
--- /dev/null
+++ b/pkg/node/round_robin_test.go
@@ -0,0 +1,112 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package node
+
+import (
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+func TestPickEmptySelector(t *testing.T) {
+       selector := NewRoundRobinSelector()
+       _, err := selector.Pick("group1", "", 0)
+       assert.Error(t, err)
+}
+
+func TestPickSingleSelection(t *testing.T) {
+       selector := NewRoundRobinSelector()
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node1"}})
+       node, err := selector.Pick("group1", "", 0)
+       assert.NoError(t, err)
+       assert.Equal(t, "node1", node)
+}
+
+func TestPickMultipleSelections(t *testing.T) {
+       selector := NewRoundRobinSelector()
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node1"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node2"}})
+       // load data
+       _, err := selector.Pick("group1", "", 0)
+       assert.NoError(t, err)
+       _, err = selector.Pick("group1", "", 1)
+       assert.NoError(t, err)
+       node1, err := selector.Pick("group1", "", 0)
+       assert.NoError(t, err)
+       node2, err := selector.Pick("group1", "", 1)
+       assert.NoError(t, err)
+       assert.NotEqual(t, node1, node2, "Different shardIDs in the same group 
should not result in the same node")
+}
+
+func TestPickNodeRemoval(t *testing.T) {
+       selector := NewRoundRobinSelector()
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node1"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node2"}})
+       selector.RemoveNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node1"}})
+       node, err := selector.Pick("group1", "", 0)
+       assert.NoError(t, err)
+       assert.Equal(t, "node2", node)
+}
+
+func TestPickConsistentSelectionAfterRemoval(t *testing.T) {
+       selector := NewRoundRobinSelector()
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node1"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node2"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node3"}})
+       _, err := selector.Pick("group1", "", 0)
+       assert.NoError(t, err)
+       _, err = selector.Pick("group1", "", 1)
+       assert.NoError(t, err)
+       node, err := selector.Pick("group1", "", 1)
+       assert.NoError(t, err)
+       assert.Equal(t, "node2", node)
+       selector.RemoveNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node2"}})
+       node, err = selector.Pick("group1", "", 1)
+       assert.NoError(t, err)
+       assert.Equal(t, "node3", node)
+}
+
+func TestCleanupExpiredEntries(t *testing.T) {
+       mc := timestamp.NewMockClock()
+       mc.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
+       selector := &roundRobinSelector{
+               nodes: make([]string, 0),
+               clock: mc,
+       }
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node1"}})
+       selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name: 
"node2"}})
+       _, err := selector.Pick("group1", "", 0)
+       assert.NoError(t, err)
+       _, ok := selector.lookupTable.Load(key{group: "group1", shardID: 0})
+       assert.True(t, ok)
+       mc.Add(25 * time.Hour)
+       _, err = selector.Pick("group1", "", 1)
+       assert.NoError(t, err)
+       _, ok = selector.lookupTable.Load(key{group: "group1", shardID: 1})
+       assert.True(t, ok)
+       selector.cleanupExpiredEntries()
+       _, ok = selector.lookupTable.Load(key{group: "group1", shardID: 0})
+       assert.False(t, ok)
+       _, ok = selector.lookupTable.Load(key{group: "group1", shardID: 1})
+       assert.True(t, ok)
+}
diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go
index 77e7360c..1d0405ed 100644
--- a/pkg/pb/v1/metadata.go
+++ b/pkg/pb/v1/metadata.go
@@ -19,6 +19,8 @@
 package v1
 
 import (
+       "context"
+
        "github.com/apache/skywalking-banyandb/api/common"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
@@ -138,7 +140,7 @@ type StreamQueryOptions struct {
 
 // StreamQueryResult is the result of a stream query.
 type StreamQueryResult interface {
-       Pull() *StreamResult
+       Pull(context.Context) *StreamResult
        Release()
 }
 
diff --git a/pkg/query/logical/stream/stream_plan_distributed.go 
b/pkg/query/logical/stream/stream_plan_distributed.go
index 26380542..55a6b5fb 100644
--- a/pkg/query/logical/stream/stream_plan_distributed.go
+++ b/pkg/query/logical/stream/stream_plan_distributed.go
@@ -31,7 +31,9 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/iter/sort"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
 )
@@ -129,14 +131,28 @@ type distributedPlan struct {
 
 func (t *distributedPlan) Close() {}
 
-func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element, 
error) {
+func (t *distributedPlan) Execute(ctx context.Context) (ee 
[]*streamv1.Element, err error) {
        dctx := executor.FromDistributedExecutionContext(ctx)
-       query := proto.Clone(t.queryTemplate).(*streamv1.QueryRequest)
-       query.TimeRange = dctx.TimeRange()
+       queryRequest := proto.Clone(t.queryTemplate).(*streamv1.QueryRequest)
+       queryRequest.TimeRange = dctx.TimeRange()
        if t.maxElementSize > 0 {
-               query.Limit = t.maxElementSize
+               queryRequest.Limit = t.maxElementSize
        }
-       ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
+       tracer := query.GetTracer(ctx)
+       var span *query.Span
+       if tracer != nil {
+               span, _ = tracer.StartSpan(ctx, "distributed-client")
+               queryRequest.Trace = true
+               span.Tag("request", 
convert.BytesToString(logger.Proto(queryRequest)))
+               defer func() {
+                       if err != nil {
+                               span.Error(err)
+                       } else {
+                               span.Stop()
+                       }
+               }()
+       }
+       ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery, 
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), queryRequest))
        if err != nil {
                return nil, err
        }
@@ -151,6 +167,9 @@ func (t *distributedPlan) Execute(ctx context.Context) 
([]*streamv1.Element, err
                                continue
                        }
                        resp := d.(*streamv1.QueryResponse)
+                       if span != nil {
+                               span.AddSubTrace(resp.Trace)
+                       }
                        see = append(see,
                                newSortableElements(resp.Elements, 
t.sortByTime, t.sortTagSpec))
                }
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go 
b/pkg/query/logical/stream/stream_plan_indexscan_local.go
index 95a83bbc..213774bd 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_local.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go
@@ -74,7 +74,7 @@ func (i *localIndexScan) Sort(order *logical.OrderBy) {
 
 func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, 
error) {
        if i.result != nil {
-               return BuildElementsFromStreamResult(i.result), nil
+               return BuildElementsFromStreamResult(ctx, i.result), nil
        }
        var orderBy *pbv1.OrderBy
        if i.order != nil {
@@ -99,7 +99,7 @@ func (i *localIndexScan) Execute(ctx context.Context) 
([]*streamv1.Element, erro
        if i.result == nil {
                return nil, nil
        }
-       return BuildElementsFromStreamResult(i.result), nil
+       return BuildElementsFromStreamResult(ctx, i.result), nil
 }
 
 func (i *localIndexScan) String() string {
@@ -120,8 +120,8 @@ func (i *localIndexScan) Schema() logical.Schema {
 }
 
 // BuildElementsFromStreamResult builds a slice of elements from the given 
stream query result.
-func BuildElementsFromStreamResult(result pbv1.StreamQueryResult) (elements 
[]*streamv1.Element) {
-       r := result.Pull()
+func BuildElementsFromStreamResult(ctx context.Context, result 
pbv1.StreamQueryResult) (elements []*streamv1.Element) {
+       r := result.Pull(ctx)
        if r == nil {
                return nil
        }


Reply via email to