This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 285db188 Introduce more query tracing and node selector (#486)
285db188 is described below
commit 285db188e633c6b95b8c5c354e043db79658c147
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Jul 15 14:24:24 2024 +0800
Introduce more query tracing and node selector (#486)
* Enable tracing stream
* Introduce more query tracing and node selector
Signed-off-by: Gao Hongtao <[email protected]>
---------
Signed-off-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 3 +
api/proto/banyandb/measure/v1/topn.proto | 5 +
banyand/dquery/stream.go | 29 +++-
banyand/liaison/grpc/measure.go | 23 ++-
banyand/liaison/grpc/stream.go | 24 ++-
banyand/measure/trace.go | 1 +
banyand/query/processor.go | 26 ++-
banyand/query/processor_topn.go | 52 +++++-
banyand/stream/benchmark_test.go | 10 +-
banyand/stream/query.go | 54 ++++---
banyand/stream/query_test.go | 3 +-
banyand/{measure => stream}/trace.go | 6 +-
docs/api-reference.md | 2 +
pkg/cmdsetup/liaison.go | 3 +-
pkg/node/interface.go | 3 +
pkg/node/maglev.go | 2 +
pkg/node/round_robin.go | 177 +++++++++++++++++++++
pkg/node/round_robin_test.go | 112 +++++++++++++
pkg/pb/v1/metadata.go | 4 +-
.../logical/stream/stream_plan_distributed.go | 29 +++-
.../logical/stream/stream_plan_indexscan_local.go | 8 +-
21 files changed, 522 insertions(+), 54 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index be310f0e..8ab401c5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -15,6 +15,9 @@ Release Notes.
- Add the measure query trace.
- Assign a separate lookup table to each group in the maglev selector.
- Convert the async local pipeline to a sync pipeline.
+- Add the stream query trace.
+- Add the topN query trace.
+- Introduce the round-robin selector to Liaison Node.
### Bugs
diff --git a/api/proto/banyandb/measure/v1/topn.proto
b/api/proto/banyandb/measure/v1/topn.proto
index f6366b1d..1b349bc6 100644
--- a/api/proto/banyandb/measure/v1/topn.proto
+++ b/api/proto/banyandb/measure/v1/topn.proto
@@ -19,6 +19,7 @@ syntax = "proto3";
package banyandb.measure.v1;
+import "banyandb/common/v1/trace.proto";
import "banyandb/model/v1/common.proto";
import "banyandb/model/v1/query.proto";
import "google/protobuf/timestamp.proto";
@@ -44,6 +45,8 @@ message TopNResponse {
// lists contain a series topN lists ranked by timestamp
// if agg_func in query request is specified, lists' size should be one.
repeated TopNList lists = 1;
+ // trace contains the trace information of the query when trace is enabled
+ common.v1.Trace trace = 2;
}
// TopNRequest is the request contract for query.
@@ -63,4 +66,6 @@ message TopNRequest {
repeated model.v1.Condition conditions = 6;
// field_value_sort indicates how to sort fields
model.v1.Sort field_value_sort = 7;
+ // trace is used to enable trace for the query
+ bool trace = 8;
}
diff --git a/banyand/dquery/stream.go b/banyand/dquery/stream.go
index 5cfacdbd..8931ebb5 100644
--- a/banyand/dquery/stream.go
+++ b/banyand/dquery/stream.go
@@ -19,14 +19,17 @@ package dquery
import (
"context"
+ "errors"
"time"
"github.com/apache/skywalking-banyandb/api/common"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
logical_stream
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
)
@@ -38,7 +41,8 @@ type streamQueryProcessor struct {
}
func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
- now := time.Now().UnixNano()
+ n := time.Now()
+ now := n.UnixNano()
queryCriteria, ok := message.Data().(*streamv1.QueryRequest)
if !ok {
resp = bus.NewMessage(bus.MessageID(now),
common.NewError("invalid event data type"))
@@ -76,9 +80,30 @@ func (p *streamQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
if p.log.Debug().Enabled() {
p.log.Debug().Str("plan", plan.String()).Msg("query plan")
}
+ ctx := context.Background()
+ var tracer *query.Tracer
+ var span *query.Span
+ if queryCriteria.Trace {
+ tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
+ span, ctx = tracer.StartSpan(ctx, "distributed-%s",
p.queryService.nodeID)
+ span.Tag("plan", plan.String())
+ defer func() {
+ data := resp.Data()
+ switch d := data.(type) {
+ case *streamv1.QueryResponse:
+ d.Trace = tracer.ToProto()
+ case common.Error:
+ span.Error(errors.New(d.Msg()))
+ resp = bus.NewMessage(bus.MessageID(now),
&measurev1.QueryResponse{Trace: tracer.ToProto()})
+ default:
+ panic("unexpected data type")
+ }
+ span.Stop()
+ }()
+ }
se := plan.(executor.StreamExecutable)
defer se.Close()
- entities, err :=
se.Execute(executor.WithDistributedExecutionContext(context.Background(),
&distributedContext{
+ entities, err :=
se.Execute(executor.WithDistributedExecutionContext(ctx, &distributedContext{
Broadcaster: p.broadcaster,
timeRange: queryCriteria.TimeRange,
}))
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 175c687c..6bedb050 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -188,12 +188,27 @@ func (ms *measureService) Query(_ context.Context, req
*measurev1.QueryRequest)
return nil, nil
}
-func (ms *measureService) TopN(_ context.Context, topNRequest
*measurev1.TopNRequest) (*measurev1.TopNResponse, error) {
- if err := timestamp.CheckTimeRange(topNRequest.GetTimeRange()); err !=
nil {
+func (ms *measureService) TopN(_ context.Context, topNRequest
*measurev1.TopNRequest) (resp *measurev1.TopNResponse, err error) {
+ if err = timestamp.CheckTimeRange(topNRequest.GetTimeRange()); err !=
nil {
return nil, status.Errorf(codes.InvalidArgument, "%v is invalid
:%s", topNRequest.GetTimeRange(), err)
}
-
- message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()),
topNRequest)
+ now := time.Now()
+ if topNRequest.Trace {
+ ctx := context.TODO()
+ tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
+ span, _ := tracer.StartSpan(ctx, "topn-grpc")
+ span.Tag("request",
convert.BytesToString(logger.Proto(topNRequest)))
+ defer func() {
+ if err != nil {
+ span.Error(err)
+ } else {
+ span.AddSubTrace(resp.Trace)
+ resp.Trace = tracer.ToProto()
+ }
+ span.Stop()
+ }()
+ }
+ message := bus.NewMessage(bus.MessageID(now.UnixNano()), topNRequest)
feat, errQuery := ms.broadcaster.Publish(data.TopicTopNQuery, message)
if errQuery != nil {
return nil, errQuery
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 0a8946cf..7ee22256 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -34,8 +34,10 @@ import (
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/accesslog"
"github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -139,15 +141,31 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
var emptyStreamQueryResponse = &streamv1.QueryResponse{Elements:
make([]*streamv1.Element, 0)}
-func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest)
(*streamv1.QueryResponse, error) {
+func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest)
(resp *streamv1.QueryResponse, err error) {
timeRange := req.GetTimeRange()
if timeRange == nil {
req.TimeRange = timestamp.DefaultTimeRange
}
- if err := timestamp.CheckTimeRange(req.GetTimeRange()); err != nil {
+ if err = timestamp.CheckTimeRange(req.GetTimeRange()); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%v is invalid
:%s", req.GetTimeRange(), err)
}
- message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), req)
+ now := time.Now()
+ if req.Trace {
+ ctx := context.TODO()
+ tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
+ span, _ := tracer.StartSpan(ctx, "stream-grpc")
+ span.Tag("request", convert.BytesToString(logger.Proto(req)))
+ defer func() {
+ if err != nil {
+ span.Error(err)
+ } else {
+ span.AddSubTrace(resp.Trace)
+ resp.Trace = tracer.ToProto()
+ }
+ span.Stop()
+ }()
+ }
+ message := bus.NewMessage(bus.MessageID(now.UnixNano()), req)
feat, errQuery := s.broadcaster.Publish(data.TopicStreamQuery, message)
if errQuery != nil {
if errors.Is(errQuery, io.EOF) {
diff --git a/banyand/measure/trace.go b/banyand/measure/trace.go
index ccf51f3a..42d226cd 100644
--- a/banyand/measure/trace.go
+++ b/banyand/measure/trace.go
@@ -59,6 +59,7 @@ func startBlockScanSpan(ctx context.Context, sids int, parts
[]*part, qr *queryR
span, _ := tracer.StartSpan(ctx, "scan-blocks")
span.Tag("series_num", fmt.Sprintf("%d", sids))
span.Tag("part_header", partMetadataHeader)
+ span.Tag("part_num", fmt.Sprintf("%d", len(parts)))
for i := range parts {
span.Tag(fmt.Sprintf("part_%d_%s", parts[i].partMetadata.ID,
parts[i].path),
parts[i].partMetadata.String())
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 7eb49d07..f1464e2c 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -71,7 +71,8 @@ type streamQueryProcessor struct {
}
func (p *streamQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
- now := time.Now().UnixNano()
+ n := time.Now()
+ now := n.UnixNano()
queryCriteria, ok := message.Data().(*streamv1.QueryRequest)
if !ok {
resp = bus.NewMessage(bus.MessageID(now),
common.NewError("invalid event data type"))
@@ -115,9 +116,30 @@ func (p *streamQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
if p.log.Debug().Enabled() {
p.log.Debug().Str("plan", plan.String()).Msg("query plan")
}
+ ctx := context.Background()
+ var tracer *query.Tracer
+ var span *query.Span
+ if queryCriteria.Trace {
+ tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
+ span, ctx = tracer.StartSpan(ctx, "data-%s",
p.queryService.nodeID)
+ span.Tag("plan", plan.String())
+ defer func() {
+ data := resp.Data()
+ switch d := data.(type) {
+ case *streamv1.QueryResponse:
+ d.Trace = tracer.ToProto()
+ case common.Error:
+ span.Error(errors.New(d.Msg()))
+ resp = bus.NewMessage(bus.MessageID(now),
&measurev1.QueryResponse{Trace: tracer.ToProto()})
+ default:
+ panic("unexpected data type")
+ }
+ span.Stop()
+ }()
+ }
se := plan.(executor.StreamExecutable)
defer se.Close()
- entities, err :=
se.Execute(executor.WithStreamExecutionContext(context.Background(), ec))
+ entities, err := se.Execute(executor.WithStreamExecutionContext(ctx,
ec))
if err != nil {
p.log.Error().Err(err).RawJSON("req",
logger.Proto(queryCriteria)).Msg("fail to execute the query plan")
resp = bus.NewMessage(bus.MessageID(now),
common.NewError("execute the query plan for stream %s: %v", meta.GetName(),
err))
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index d734e18b..7ebb1d48 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -20,6 +20,8 @@ package query
import (
"container/heap"
"context"
+ "errors"
+ "fmt"
"slices"
"time"
@@ -34,6 +36,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/flow"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/aggregation"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
logical_measure
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
@@ -46,7 +49,8 @@ type topNQueryProcessor struct {
func (t *topNQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
request, ok := message.Data().(*measurev1.TopNRequest)
- now := time.Now().UnixNano()
+ n := time.Now()
+ now := n.UnixNano()
if !ok {
t.log.Warn().Msg("invalid event data type")
return
@@ -114,8 +118,28 @@ func (t *topNQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
if e := ml.Debug(); e.Enabled() {
e.Str("plan", plan.String()).Msg("topn plan")
}
-
- mIterator, err :=
plan.(executor.MeasureExecutable).Execute(executor.WithMeasureExecutionContext(context.Background(),
topNResultMeasure))
+ ctx := context.Background()
+ var tracer *query.Tracer
+ var span *query.Span
+ if request.Trace {
+ tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
+ span, ctx = tracer.StartSpan(ctx, "data-%s",
t.queryService.nodeID)
+ span.Tag("plan", plan.String())
+ defer func() {
+ data := resp.Data()
+ switch d := data.(type) {
+ case *measurev1.TopNResponse:
+ d.Trace = tracer.ToProto()
+ case common.Error:
+ span.Error(errors.New(d.Msg()))
+ resp = bus.NewMessage(bus.MessageID(now),
&measurev1.QueryResponse{Trace: tracer.ToProto()})
+ default:
+ panic("unexpected data type")
+ }
+ span.Stop()
+ }()
+ }
+ mIterator, err :=
plan.(executor.MeasureExecutable).Execute(executor.WithMeasureExecutionContext(ctx,
topNResultMeasure))
if err != nil {
ml.Error().Err(err).RawJSON("req",
logger.Proto(request)).Msg("fail to close the topn plan")
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to execute the topn plan for measure %s: %v", topNMetadata.GetName(), err))
@@ -128,12 +152,24 @@ func (t *topNQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
}()
result := make([]*measurev1.DataPoint, 0)
- for mIterator.Next() {
- current := mIterator.Current()
- if len(current) > 0 {
- result = append(result, current[0])
+ func() {
+ var r int
+ if tracer != nil {
+ iterSpan, _ := tracer.StartSpan(ctx, "iterator")
+ defer func() {
+ iterSpan.Tag("rounds", fmt.Sprintf("%d", r))
+ iterSpan.Tag("size", fmt.Sprintf("%d",
len(result)))
+ iterSpan.Stop()
+ }()
}
- }
+ for mIterator.Next() {
+ r++
+ current := mIterator.Current()
+ if len(current) > 0 {
+ result = append(result, current[0])
+ }
+ }
+ }()
resp = bus.NewMessage(bus.MessageID(now), toTopNResponse(result))
return
diff --git a/banyand/stream/benchmark_test.go b/banyand/stream/benchmark_test.go
index 823116bc..f2fb7c34 100644
--- a/banyand/stream/benchmark_test.go
+++ b/banyand/stream/benchmark_test.go
@@ -344,6 +344,7 @@ func generateStreamQueryOptions(p parameter, index
mockIndex) pbv1.StreamQueryOp
func BenchmarkFilter(b *testing.B) {
b.ReportAllocs()
+ ctx := context.TODO()
for _, p := range pList {
esList, docsList, idx := generateData(p)
db := write(b, p, esList, docsList)
@@ -351,24 +352,25 @@ func BenchmarkFilter(b *testing.B) {
sqo := generateStreamQueryOptions(p, idx)
sqo.Order = nil
b.Run("filter-"+p.scenario, func(b *testing.B) {
- res, err := s.Query(context.TODO(), sqo)
+ res, err := s.Query(ctx, sqo)
require.NoError(b, err)
- logicalstream.BuildElementsFromStreamResult(res)
+ logicalstream.BuildElementsFromStreamResult(ctx, res)
})
}
}
func BenchmarkSort(b *testing.B) {
b.ReportAllocs()
+ ctx := context.TODO()
for _, p := range pList {
esList, docsList, idx := generateData(p)
db := write(b, p, esList, docsList)
s := generateStream(db)
sqo := generateStreamQueryOptions(p, idx)
b.Run("sort-"+p.scenario, func(b *testing.B) {
- res, err := s.Query(context.TODO(), sqo)
+ res, err := s.Query(ctx, sqo)
require.NoError(b, err)
- logicalstream.BuildElementsFromStreamResult(res)
+ logicalstream.BuildElementsFromStreamResult(ctx, res)
})
}
}
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index 8bd1f105..ef7925b9 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -36,6 +36,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
@@ -136,24 +137,24 @@ type queryResult struct {
asc bool
}
-func (qr *queryResult) Pull() *pbv1.StreamResult {
+func (qr *queryResult) Pull(ctx context.Context) *pbv1.StreamResult {
if qr.sortingIter == nil {
qo := qr.qo
sort.Slice(qo.sortedSids, func(i, j int) bool { return
qo.sortedSids[i] < qo.sortedSids[j] })
- return qr.load(qo)
+ return qr.load(ctx, qo)
}
if !qr.loaded {
qr.elementIDsSorted = make([]uint64, 0, qr.qo.MaxElementSize)
- return qr.loadSortingData()
+ return qr.loadSortingData(ctx)
}
if v := qr.nextValue(); v != nil {
return v
}
qr.loaded = false
- return qr.loadSortingData()
+ return qr.loadSortingData(ctx)
}
-func (qr *queryResult) scanParts(qo queryOptions) error {
+func (qr *queryResult) scanParts(ctx context.Context, qo queryOptions) error {
var parts []*part
var n int
for i := range qr.tabWrappers {
@@ -170,6 +171,8 @@ func (qr *queryResult) scanParts(qo queryOptions) error {
}
bma := generateBlockMetadataArray()
defer releaseBlockMetadataArray(bma)
+ defFn := startBlockScanSpan(ctx, len(qo.sortedSids), parts, qr)
+ defer defFn()
// TODO: cache tstIter
var ti tstIter
defer ti.reset()
@@ -190,9 +193,9 @@ func (qr *queryResult) scanParts(qo queryOptions) error {
return nil
}
-func (qr *queryResult) load(qo queryOptions) *pbv1.StreamResult {
+func (qr *queryResult) load(ctx context.Context, qo queryOptions)
*pbv1.StreamResult {
if !qr.loaded {
- if err := qr.scanParts(qo); err != nil {
+ if err := qr.scanParts(ctx, qo); err != nil {
return &pbv1.StreamResult{
Error: err,
}
@@ -293,16 +296,34 @@ func (qr *queryResult) nextValue() *pbv1.StreamResult {
return qr.mergeByTimestamp()
}
-func (qr *queryResult) loadSortingData() *pbv1.StreamResult {
+func (qr *queryResult) loadSortingData(ctx context.Context) *pbv1.StreamResult
{
var qo queryOptions
qo.StreamQueryOptions = qr.qo.StreamQueryOptions
- if qr.qo.elementFilter != nil {
- qo.elementFilter = roaring.NewPostingList()
- }
+ qo.elementFilter = roaring.NewPostingList()
qo.seriesToEntity = qr.qo.seriesToEntity
qr.elementIDsSorted = qr.elementIDsSorted[:0]
- for count := 1; qr.sortingIter.Next(); count++ {
+ count, searchedSize := 1, 0
+ tracer := query.GetTracer(ctx)
+ if tracer != nil {
+ span, _ := tracer.StartSpan(ctx, "load-sorting-data")
+ span.Tagf("max_element_size", "%d", qo.MaxElementSize)
+ if qr.qo.elementFilter != nil {
+ span.Tag("filter_size", fmt.Sprintf("%d",
qr.qo.elementFilter.Len()))
+ }
+ defer func() {
+ span.Tagf("searched_size", "%d", searchedSize)
+ span.Tagf("count", "%d", count)
+ span.Stop()
+ }()
+ }
+ for ; qr.sortingIter.Next(); count++ {
+ searchedSize++
val := qr.sortingIter.Val()
+ if qr.qo.elementFilter != nil &&
!qr.qo.elementFilter.Contains(val.DocID) {
+ count--
+ continue
+ }
+ qo.elementFilter.Insert(val.DocID)
if val.Timestamp > qo.maxTimestamp {
qo.maxTimestamp = val.Timestamp
}
@@ -310,9 +331,6 @@ func (qr *queryResult) loadSortingData() *pbv1.StreamResult
{
qo.minTimestamp = val.Timestamp
}
qr.elementIDsSorted = append(qr.elementIDsSorted, val.DocID)
- if qo.elementFilter != nil {
- qo.elementFilter.Insert(val.DocID)
- }
// Insertion sort
insertPos, found := -1, false
@@ -338,10 +356,10 @@ func (qr *queryResult) loadSortingData()
*pbv1.StreamResult {
break
}
}
- if qo.elementFilter != nil {
- _ = qo.elementFilter.Intersect(qr.qo.elementFilter)
+ if qo.elementFilter.IsEmpty() {
+ return nil
}
- return qr.load(qo)
+ return qr.load(ctx, qo)
}
func (qr *queryResult) releaseParts() {
diff --git a/banyand/stream/query_test.go b/banyand/stream/query_test.go
index dcfb6d5c..8cc5236d 100644
--- a/banyand/stream/query_test.go
+++ b/banyand/stream/query_test.go
@@ -315,8 +315,9 @@ func TestQueryResult(t *testing.T) {
result.asc = tt.ascTS
}
var got []pbv1.StreamResult
+ ctx := context.Background()
for {
- r := result.Pull()
+ r := result.Pull(ctx)
if r == nil {
break
}
diff --git a/banyand/measure/trace.go b/banyand/stream/trace.go
similarity index 93%
copy from banyand/measure/trace.go
copy to banyand/stream/trace.go
index ccf51f3a..ed428a67 100644
--- a/banyand/measure/trace.go
+++ b/banyand/stream/trace.go
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package measure
+package stream
import (
"context"
@@ -57,8 +57,12 @@ func startBlockScanSpan(ctx context.Context, sids int, parts
[]*part, qr *queryR
}
span, _ := tracer.StartSpan(ctx, "scan-blocks")
+ if qr.qo.elementFilter != nil {
+ span.Tag("filter_size", fmt.Sprintf("%d",
qr.qo.elementFilter.Len()))
+ }
span.Tag("series_num", fmt.Sprintf("%d", sids))
span.Tag("part_header", partMetadataHeader)
+ span.Tag("part_num", fmt.Sprintf("%d", len(parts)))
for i := range parts {
span.Tag(fmt.Sprintf("part_%d_%s", parts[i].partMetadata.ID,
parts[i].path),
parts[i].partMetadata.String())
diff --git a/docs/api-reference.md b/docs/api-reference.md
index b837540d..2495c817 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -2561,6 +2561,7 @@ TopNRequest is the request contract for query.
| agg |
[banyandb.model.v1.AggregationFunction](#banyandb-model-v1-AggregationFunction)
| | agg aggregates lists grouped by field names in the time_range TODO
validate enum defined_only |
| conditions | [banyandb.model.v1.Condition](#banyandb-model-v1-Condition) |
repeated | criteria select counters. Only equals are acceptable. |
| field_value_sort | [banyandb.model.v1.Sort](#banyandb-model-v1-Sort) | |
field_value_sort indicates how to sort fields |
+| trace | [bool](#bool) | | trace is used to enable trace for the query |
@@ -2576,6 +2577,7 @@ TopNResponse is the response for a query to the Query
module.
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| lists | [TopNList](#banyandb-measure-v1-TopNList) | repeated | lists contain
a series topN lists ranked by timestamp if agg_func in query request is
specified, lists' size should be one. |
+| trace | [banyandb.common.v1.Trace](#banyandb-common-v1-Trace) | | trace
contains the trace information of the query when trace is enabled |
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 67cd942a..460c8544 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -46,7 +46,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
}
pipeline := pub.New(metaSvc)
localPipeline := queue.Local()
- nodeSel := node.NewMaglevSelector()
+ nodeSel := node.NewRoundRobinSelector()
nodeRegistry := grpc.NewClusterNodeRegistry(pipeline, nodeSel)
grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc,
nodeRegistry)
profSvc := observability.NewProfService()
@@ -77,6 +77,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
Version: version.Build(),
Short: "Run as the liaison server",
RunE: func(_ *cobra.Command, _ []string) (err error) {
+ defer nodeSel.Close()
node, err := common.GenerateNode(grpcServer.GetPort(),
httpServer.GetPort())
if err != nil {
return err
diff --git a/pkg/node/interface.go b/pkg/node/interface.go
index 26ec5cc2..1a4f5882 100644
--- a/pkg/node/interface.go
+++ b/pkg/node/interface.go
@@ -39,6 +39,7 @@ type Selector interface {
AddNode(node *databasev1.Node)
RemoveNode(node *databasev1.Node)
Pick(group, name string, shardID uint32) (string, error)
+ Close()
}
// NewPickFirstSelector returns a simple selector that always returns the
first node if exists.
@@ -55,6 +56,8 @@ type pickFirstSelector struct {
mu sync.RWMutex
}
+func (p *pickFirstSelector) Close() {}
+
func (p *pickFirstSelector) AddNode(node *databasev1.Node) {
nodeID := node.GetMetadata().GetName()
p.mu.RLock()
diff --git a/pkg/node/maglev.go b/pkg/node/maglev.go
index fea2c5b2..4ae21740 100644
--- a/pkg/node/maglev.go
+++ b/pkg/node/maglev.go
@@ -37,6 +37,8 @@ type maglevSelector struct {
mutex sync.RWMutex
}
+func (m *maglevSelector) Close() {}
+
func (m *maglevSelector) AddNode(node *databasev1.Node) {
m.mutex.Lock()
defer m.mutex.Unlock()
diff --git a/pkg/node/round_robin.go b/pkg/node/round_robin.go
new file mode 100644
index 00000000..da85c210
--- /dev/null
+++ b/pkg/node/round_robin.go
@@ -0,0 +1,177 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package node
+
+import (
+ "fmt"
+ "slices"
+ "sort"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/pkg/errors"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+const (
+ expiredKeyCleanupInterval = 1 * time.Hour
+ keyTTL = 24 * time.Hour
+)
+
+type roundRobinSelector struct {
+ clock timestamp.Clock
+ closeCh chan struct{}
+ lookupTable sync.Map
+ nodes []string
+ mu sync.RWMutex
+ once sync.Once
+ tMu sync.Mutex
+}
+
+func (r *roundRobinSelector) Close() {
+ close(r.closeCh)
+}
+
+// NewRoundRobinSelector creates a new round-robin selector.
+func NewRoundRobinSelector() Selector {
+ rrs := &roundRobinSelector{
+ nodes: make([]string, 0),
+ clock: timestamp.NewClock(),
+ closeCh: make(chan struct{}),
+ }
+ return rrs
+}
+
+func (r *roundRobinSelector) AddNode(node *databasev1.Node) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ r.nodes = append(r.nodes, node.Metadata.Name)
+ sort.StringSlice(r.nodes).Sort()
+}
+
+func (r *roundRobinSelector) RemoveNode(node *databasev1.Node) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
+ for i, n := range r.nodes {
+ if n == node.Metadata.Name {
+ r.nodes = append(r.nodes[:i], r.nodes[i+1:]...)
+ break
+ }
+ }
+}
+
+func (r *roundRobinSelector) Pick(group, _ string, shardID uint32) (string,
error) {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ k := key{group: group, shardID: shardID}
+ if len(r.nodes) == 0 {
+ return "", errors.New("no nodes available")
+ }
+ entry, ok := r.lookupTable.Load(k)
+ if ok {
+ return r.selectNode(entry), nil
+ }
+ r.tMu.Lock()
+ defer r.tMu.Unlock()
+ if entry, ok := r.lookupTable.Load(k); ok {
+ return r.selectNode(entry), nil
+ }
+
+ keys := []key{k}
+ r.lookupTable.Range(func(k, _ any) bool {
+ keys = append(keys, k.(key))
+ return true
+ })
+ slices.SortFunc(keys, func(a, b key) int {
+ n := strings.Compare(a.group, b.group)
+ if n != 0 {
+ return n
+ }
+ return int(a.shardID) - int(b.shardID)
+ })
+ for i := range keys {
+ if entry, ok := r.lookupTable.Load(keys[i]); ok {
+ entry.(*tableEntry).index = i
+ } else {
+ r.lookupTable.Store(keys[i], r.newTableEntry(i))
+ }
+ }
+ r.once.Do(r.startCleanupTicker)
+ if entry, ok := r.lookupTable.Load(k); ok {
+ return r.selectNode(entry), nil
+ }
+ panic(fmt.Sprintf("key %v not found", k))
+}
+
+func (r *roundRobinSelector) selectNode(entry any) string {
+ e := entry.(*tableEntry)
+ now := r.clock.Now()
+ e.lastAccess.Store(&now)
+ return r.nodes[e.index%len(r.nodes)]
+}
+
+type key struct {
+ group string
+ shardID uint32
+}
+
+type tableEntry struct {
+ lastAccess *atomic.Pointer[time.Time]
+ index int
+}
+
+func (r *roundRobinSelector) newTableEntry(index int) *tableEntry {
+ p := atomic.Pointer[time.Time]{}
+ now := r.clock.Now()
+ p.Store(&now)
+ return &tableEntry{
+ index: index,
+ lastAccess: &p,
+ }
+}
+
+func (r *roundRobinSelector) cleanupExpiredEntries() {
+ now := r.clock.Now()
+ r.tMu.Lock()
+ defer r.tMu.Unlock()
+
+ r.lookupTable.Range(func(k, value any) bool {
+ e := value.(*tableEntry)
+ if now.Sub(*e.lastAccess.Load()) > keyTTL {
+ r.lookupTable.Delete(k)
+ }
+ return true
+ })
+}
+
+func (r *roundRobinSelector) startCleanupTicker() {
+ ticker := r.clock.Ticker(expiredKeyCleanupInterval)
+ go func() {
+ select {
+ case <-r.closeCh:
+ ticker.Stop()
+ return
+ case <-ticker.C:
+ r.cleanupExpiredEntries()
+ }
+ }()
+}
diff --git a/pkg/node/round_robin_test.go b/pkg/node/round_robin_test.go
new file mode 100644
index 00000000..69c1822a
--- /dev/null
+++ b/pkg/node/round_robin_test.go
@@ -0,0 +1,112 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package node
+
+import (
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+func TestPickEmptySelector(t *testing.T) {
+ selector := NewRoundRobinSelector()
+ _, err := selector.Pick("group1", "", 0)
+ assert.Error(t, err)
+}
+
+func TestPickSingleSelection(t *testing.T) {
+ selector := NewRoundRobinSelector()
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
+ node, err := selector.Pick("group1", "", 0)
+ assert.NoError(t, err)
+ assert.Equal(t, "node1", node)
+}
+
+func TestPickMultipleSelections(t *testing.T) {
+ selector := NewRoundRobinSelector()
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node2"}})
+ // load data
+ _, err := selector.Pick("group1", "", 0)
+ assert.NoError(t, err)
+ _, err = selector.Pick("group1", "", 1)
+ assert.NoError(t, err)
+ node1, err := selector.Pick("group1", "", 0)
+ assert.NoError(t, err)
+ node2, err := selector.Pick("group1", "", 1)
+ assert.NoError(t, err)
+ assert.NotEqual(t, node1, node2, "Different shardIDs in the same group
should not result in the same node")
+}
+
+func TestPickNodeRemoval(t *testing.T) {
+ selector := NewRoundRobinSelector()
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node2"}})
+ selector.RemoveNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
+ node, err := selector.Pick("group1", "", 0)
+ assert.NoError(t, err)
+ assert.Equal(t, "node2", node)
+}
+
+func TestPickConsistentSelectionAfterRemoval(t *testing.T) {
+ selector := NewRoundRobinSelector()
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node2"}})
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node3"}})
+ _, err := selector.Pick("group1", "", 0)
+ assert.NoError(t, err)
+ _, err = selector.Pick("group1", "", 1)
+ assert.NoError(t, err)
+ node, err := selector.Pick("group1", "", 1)
+ assert.NoError(t, err)
+ assert.Equal(t, "node2", node)
+ selector.RemoveNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node2"}})
+ node, err = selector.Pick("group1", "", 1)
+ assert.NoError(t, err)
+ assert.Equal(t, "node3", node)
+}
+
+func TestCleanupExpiredEntries(t *testing.T) {
+ mc := timestamp.NewMockClock()
+ mc.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
+ selector := &roundRobinSelector{
+ nodes: make([]string, 0),
+ clock: mc,
+ }
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node2"}})
+ _, err := selector.Pick("group1", "", 0)
+ assert.NoError(t, err)
+ _, ok := selector.lookupTable.Load(key{group: "group1", shardID: 0})
+ assert.True(t, ok)
+ mc.Add(25 * time.Hour)
+ _, err = selector.Pick("group1", "", 1)
+ assert.NoError(t, err)
+ _, ok = selector.lookupTable.Load(key{group: "group1", shardID: 1})
+ assert.True(t, ok)
+ selector.cleanupExpiredEntries()
+ _, ok = selector.lookupTable.Load(key{group: "group1", shardID: 0})
+ assert.False(t, ok)
+ _, ok = selector.lookupTable.Load(key{group: "group1", shardID: 1})
+ assert.True(t, ok)
+}
diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go
index 77e7360c..1d0405ed 100644
--- a/pkg/pb/v1/metadata.go
+++ b/pkg/pb/v1/metadata.go
@@ -19,6 +19,8 @@
package v1
import (
+ "context"
+
"github.com/apache/skywalking-banyandb/api/common"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
@@ -138,7 +140,7 @@ type StreamQueryOptions struct {
// StreamQueryResult is the result of a stream query.
type StreamQueryResult interface {
- Pull() *StreamResult
+ Pull(context.Context) *StreamResult
Release()
}
diff --git a/pkg/query/logical/stream/stream_plan_distributed.go
b/pkg/query/logical/stream/stream_plan_distributed.go
index 26380542..55a6b5fb 100644
--- a/pkg/query/logical/stream/stream_plan_distributed.go
+++ b/pkg/query/logical/stream/stream_plan_distributed.go
@@ -31,7 +31,9 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/iter/sort"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
@@ -129,14 +131,28 @@ type distributedPlan struct {
func (t *distributedPlan) Close() {}
-func (t *distributedPlan) Execute(ctx context.Context) ([]*streamv1.Element,
error) {
+func (t *distributedPlan) Execute(ctx context.Context) (ee
[]*streamv1.Element, err error) {
dctx := executor.FromDistributedExecutionContext(ctx)
- query := proto.Clone(t.queryTemplate).(*streamv1.QueryRequest)
- query.TimeRange = dctx.TimeRange()
+ queryRequest := proto.Clone(t.queryTemplate).(*streamv1.QueryRequest)
+ queryRequest.TimeRange = dctx.TimeRange()
if t.maxElementSize > 0 {
- query.Limit = t.maxElementSize
+ queryRequest.Limit = t.maxElementSize
}
- ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery,
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
+ tracer := query.GetTracer(ctx)
+ var span *query.Span
+ if tracer != nil {
+ span, _ = tracer.StartSpan(ctx, "distributed-client")
+ queryRequest.Trace = true
+ span.Tag("request",
convert.BytesToString(logger.Proto(queryRequest)))
+ defer func() {
+ if err != nil {
+ span.Error(err)
+ } else {
+ span.Stop()
+ }
+ }()
+ }
+ ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicStreamQuery,
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), queryRequest))
if err != nil {
return nil, err
}
@@ -151,6 +167,9 @@ func (t *distributedPlan) Execute(ctx context.Context)
([]*streamv1.Element, err
continue
}
resp := d.(*streamv1.QueryResponse)
+ if span != nil {
+ span.AddSubTrace(resp.Trace)
+ }
see = append(see,
newSortableElements(resp.Elements,
t.sortByTime, t.sortTagSpec))
}
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go
b/pkg/query/logical/stream/stream_plan_indexscan_local.go
index 95a83bbc..213774bd 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_local.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go
@@ -74,7 +74,7 @@ func (i *localIndexScan) Sort(order *logical.OrderBy) {
func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element,
error) {
if i.result != nil {
- return BuildElementsFromStreamResult(i.result), nil
+ return BuildElementsFromStreamResult(ctx, i.result), nil
}
var orderBy *pbv1.OrderBy
if i.order != nil {
@@ -99,7 +99,7 @@ func (i *localIndexScan) Execute(ctx context.Context)
([]*streamv1.Element, erro
if i.result == nil {
return nil, nil
}
- return BuildElementsFromStreamResult(i.result), nil
+ return BuildElementsFromStreamResult(ctx, i.result), nil
}
func (i *localIndexScan) String() string {
@@ -120,8 +120,8 @@ func (i *localIndexScan) Schema() logical.Schema {
}
// BuildElementsFromStreamResult builds a slice of elements from the given
stream query result.
-func BuildElementsFromStreamResult(result pbv1.StreamQueryResult) (elements
[]*streamv1.Element) {
- r := result.Pull()
+func BuildElementsFromStreamResult(ctx context.Context, result
pbv1.StreamQueryResult) (elements []*streamv1.Element) {
+ r := result.Pull(ctx)
if r == nil {
return nil
}