This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 065d7df8 Add the measure query trace system (#476)
065d7df8 is described below
commit 065d7df820ca240c4c91bab7cb614d5d9693c643
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Jun 26 07:00:45 2024 +0800
Add the measure query trace system (#476)
---
CHANGES.md | 1 +
api/proto/banyandb/common/v1/trace.proto | 2 +
banyand/dquery/dquery.go | 15 +-
banyand/dquery/measure.go | 60 ++++++--
banyand/internal/storage/index.go | 85 +++++++++--
banyand/liaison/grpc/measure.go | 37 +++--
banyand/measure/introducer.go | 15 +-
banyand/measure/query.go | 90 ++++++------
banyand/measure/trace.go | 74 ++++++++++
banyand/metadata/client.go | 6 +-
banyand/metadata/embeddedserver/server.go | 6 +-
banyand/metadata/metadata_test.go | 14 +-
banyand/query/processor.go | 85 +++++++++--
banyand/stream/introducer.go | 15 +-
docs/api-reference.md | 1 +
pkg/query/doc.go | 19 +++
pkg/query/logical/index_filter.go | 24 +++-
.../logical/measure/measure_plan_distributed.go | 37 +++--
.../measure/measure_plan_indexscan_local.go | 35 ++++-
pkg/query/tracer.go | 155 +++++++++++++++++++++
pkg/query/tracer_test.go | 131 +++++++++++++++++
pkg/test/stream/etcd.go | 3 +
test/stress/trace/docker-compose-cluster.yaml | 6 +-
23 files changed, 791 insertions(+), 125 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 13a0e2b6..82ca6e08 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,7 @@ Release Notes.
- Check unregistered nodes in background.
- Improve sorting performance of stream.
+- Add the measure query trace.
### Bugs
diff --git a/api/proto/banyandb/common/v1/trace.proto
b/api/proto/banyandb/common/v1/trace.proto
index 73a4351d..b2dfc870 100644
--- a/api/proto/banyandb/common/v1/trace.proto
+++ b/api/proto/banyandb/common/v1/trace.proto
@@ -48,6 +48,8 @@ message Span {
string message = 5;
// children is a list of child spans of the span.
repeated Span children = 6;
+ // duration is the duration of the span.
+ int64 duration = 7;
}
// Tag is the key-value pair of a span.
diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go
index c7e1454d..d378febb 100644
--- a/banyand/dquery/dquery.go
+++ b/banyand/dquery/dquery.go
@@ -20,9 +20,11 @@ package dquery
import (
"context"
+ "errors"
"go.uber.org/multierr"
+ "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure"
@@ -42,13 +44,14 @@ const (
var _ run.Service = (*queryService)(nil)
type queryService struct {
- log *logger.Logger
metaService metadata.Repo
+ pipeline queue.Server
+ log *logger.Logger
sqp *streamQueryProcessor
mqp *measureQueryProcessor
tqp *topNQueryProcessor
closer *run.Closer
- pipeline queue.Server
+ nodeID string
}
// NewService return a new query service.
@@ -78,7 +81,13 @@ func (q *queryService) Name() string {
return moduleName
}
-func (q *queryService) PreRun(_ context.Context) error {
+func (q *queryService) PreRun(ctx context.Context) error {
+ val := ctx.Value(common.ContextNodeKey)
+ if val == nil {
+ return errors.New("node id is empty")
+ }
+ node := val.(common.Node)
+ q.nodeID = node.NodeID
q.log = logger.GetLogger(moduleName)
q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log)
q.mqp.measureService = measure.NewPortableRepository(q.metaService,
q.log)
diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go
index 5f1c515a..be6a81d5 100644
--- a/banyand/dquery/measure.go
+++ b/banyand/dquery/measure.go
@@ -19,6 +19,8 @@ package dquery
import (
"context"
+ "errors"
+ "fmt"
"time"
"github.com/apache/skywalking-banyandb/api/common"
@@ -27,6 +29,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
logical_measure
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
)
@@ -39,7 +42,8 @@ type measureQueryProcessor struct {
func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
queryCriteria, ok := message.Data().(*measurev1.QueryRequest)
- now := time.Now().UnixNano()
+ n := time.Now()
+ now := n.UnixNano()
if !ok {
resp = bus.NewMessage(bus.MessageID(now),
common.NewError("invalid event data type"))
return
@@ -79,8 +83,28 @@ func (p *measureQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
if e := ml.Debug(); e.Enabled() {
e.Str("plan", plan.String()).Msg("query plan")
}
-
- mIterator, err :=
plan.(executor.MeasureExecutable).Execute(executor.WithDistributedExecutionContext(context.Background(),
&distributedContext{
+ ctx := context.Background()
+ var tracer *query.Tracer
+ var span *query.Span
+ if queryCriteria.Trace {
+ tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
+ span, ctx = tracer.StartSpan(ctx, "distributed-%s",
p.queryService.nodeID)
+ span.Tag("plan", plan.String())
+ defer func() {
+ data := resp.Data()
+ switch d := data.(type) {
+ case *measurev1.QueryResponse:
+ d.Trace = tracer.ToProto()
+ case common.Error:
+ span.Error(errors.New(d.Msg()))
+ resp = bus.NewMessage(bus.MessageID(now),
&measurev1.QueryResponse{Trace: tracer.ToProto()})
+ default:
+ panic("unexpected data type")
+ }
+ span.Stop()
+ }()
+ }
+ mIterator, err :=
plan.(executor.MeasureExecutable).Execute(executor.WithDistributedExecutionContext(ctx,
&distributedContext{
Broadcaster: p.broadcaster,
timeRange: queryCriteria.TimeRange,
}))
@@ -92,18 +116,34 @@ func (p *measureQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
defer func() {
if err = mIterator.Close(); err != nil {
ml.Error().Err(err).RawJSON("req",
logger.Proto(queryCriteria)).Msg("fail to close the query plan")
+ if span != nil {
+ span.Error(fmt.Errorf("fail to close the query
plan: %w", err))
+ }
}
}()
result := make([]*measurev1.DataPoint, 0)
- for mIterator.Next() {
- current := mIterator.Current()
- if len(current) > 0 {
- result = append(result, current[0])
+ func() {
+ var r int
+ if tracer != nil {
+ iterSpan, _ := tracer.StartSpan(ctx, "iterator")
+ defer func() {
+ iterSpan.Tag("rounds", fmt.Sprintf("%d", r))
+ iterSpan.Tag("size", fmt.Sprintf("%d",
len(result)))
+ iterSpan.Stop()
+ }()
}
- }
+ for mIterator.Next() {
+ r++
+ current := mIterator.Current()
+ if len(current) > 0 {
+ result = append(result, current[0])
+ }
+ }
+ }()
+ qr := &measurev1.QueryResponse{DataPoints: result}
if e := ml.Debug(); e.Enabled() {
- e.RawJSON("ret",
logger.Proto(&measurev1.QueryResponse{DataPoints: result})).Msg("got a measure")
+ e.RawJSON("ret", logger.Proto(qr)).Msg("got a measure")
}
- resp = bus.NewMessage(bus.MessageID(now),
&measurev1.QueryResponse{DataPoints: result})
+ resp = bus.NewMessage(bus.MessageID(now), qr)
return
}
diff --git a/banyand/internal/storage/index.go
b/banyand/internal/storage/index.go
index 4dcac9d5..cb926e0e 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -38,6 +38,8 @@ import (
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query"
+ "github.com/apache/skywalking-banyandb/pkg/query/logical"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -88,15 +90,26 @@ func (s *seriesIndex) Write(docs index.Documents) error {
var rangeOpts = index.RangeOpts{}
-func (s *seriesIndex) searchPrimary(ctx context.Context, series
[]*pbv1.Series) (pbv1.SeriesList, error) {
+func (s *seriesIndex) searchPrimary(ctx context.Context, series
[]*pbv1.Series) (sl pbv1.SeriesList, err error) {
seriesMatchers := make([]index.SeriesMatcher, len(series))
for i := range series {
- var err error
seriesMatchers[i], err =
convertEntityValuesToSeriesMatcher(series[i])
if err != nil {
return nil, err
}
}
+ tracer := query.GetTracer(ctx)
+ var span *query.Span
+ if tracer != nil {
+ span, _ = tracer.StartSpan(ctx, "seriesIndex.searchPrimary")
+ span.Tagf("matchers", "%v", seriesMatchers)
+ defer func() {
+ if err != nil {
+ span.Error(err)
+ }
+ span.Stop()
+ }()
+ }
ss, err := s.store.Search(ctx, seriesMatchers)
if err != nil {
return nil, err
@@ -105,6 +118,9 @@ func (s *seriesIndex) searchPrimary(ctx context.Context,
series []*pbv1.Series)
if err != nil {
return nil, errors.WithMessagef(err, "failed to convert index
series to series list, matchers: %v, matched: %d", seriesMatchers, len(ss))
}
+ if span != nil {
+ span.Tagf("matched", "%d", len(result))
+ }
return result, nil
}
@@ -174,28 +190,54 @@ func convertIndexSeriesToSeriesList(indexSeries
[]index.Series) (pbv1.SeriesList
return seriesList, nil
}
-func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series,
filter index.Filter, order *pbv1.OrderBy, preloadSize int) (pbv1.SeriesList,
error) {
+func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series,
filter index.Filter, order *pbv1.OrderBy, preloadSize int) (sl pbv1.SeriesList,
err error) {
+ tracer := query.GetTracer(ctx)
+ if tracer != nil {
+ var span *query.Span
+ span, ctx = tracer.StartSpan(ctx, "seriesIndex.Search")
+ defer func() {
+ if err != nil {
+ span.Error(err)
+ }
+ span.Stop()
+ }()
+ }
seriesList, err := s.searchPrimary(ctx, series)
if err != nil {
return nil, err
}
pl := seriesList.ToList()
- if filter != nil {
+ if filter != nil && filter != logical.ENode {
var plFilter posting.List
// TODO: merge searchPrimary and filter
- plFilter, err = filter.Execute(func(_
databasev1.IndexRule_Type) (index.Searcher, error) {
- return s.store, nil
- }, 0)
+ func() {
+ if tracer != nil {
+ span, _ := tracer.StartSpan(ctx, "filter")
+ span.Tag("exp", filter.String())
+ defer func() {
+ if err != nil {
+ span.Error(err)
+ } else {
+ span.Tagf("matched", "%d",
plFilter.Len())
+ span.Tagf("total", "%d",
pl.Len())
+ }
+ span.Stop()
+ }()
+ }
+ if plFilter, err = filter.Execute(func(_
databasev1.IndexRule_Type) (index.Searcher, error) {
+ return s.store, nil
+ }, 0); err != nil {
+ return
+ }
+ if plFilter == nil {
+ return
+ }
+ err = pl.Intersect(plFilter)
+ }()
if err != nil {
return nil, err
}
- if plFilter == nil {
- return pbv1.SeriesList{}, nil
- }
- if err = pl.Intersect(plFilter); err != nil {
- return nil, err
- }
}
if order == nil || order.Index == nil {
@@ -205,6 +247,17 @@ func (s *seriesIndex) Search(ctx context.Context, series
[]*pbv1.Series, filter
fieldKey := index.FieldKey{
IndexRuleID: order.Index.GetMetadata().Id,
}
+ var span *query.Span
+ if tracer != nil {
+ span, _ = tracer.StartSpan(ctx, "sort")
+ span.Tagf("preload", "%d", preloadSize)
+ defer func() {
+ if err != nil {
+ span.Error(err)
+ }
+ span.Stop()
+ }()
+ }
// TODO:// merge searchPrimary and sort
iter, err := s.store.Iterator(fieldKey, rangeOpts, order.Sort,
preloadSize)
if err != nil {
@@ -215,7 +268,9 @@ func (s *seriesIndex) Search(ctx context.Context, series
[]*pbv1.Series, filter
}()
var sortedSeriesList pbv1.SeriesList
+ var r int
for iter.Next() {
+ r++
docID := iter.Val().DocID
if !pl.Contains(docID) {
continue
@@ -225,6 +280,10 @@ func (s *seriesIndex) Search(ctx context.Context, series
[]*pbv1.Series, filter
return nil, err
}
}
+ if span != nil {
+ span.Tagf("rounds", "%d", r)
+ span.Tagf("size", "%d", len(sortedSeriesList))
+ }
return sortedSeriesList, err
}
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 6b480738..175c687c 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -34,8 +34,10 @@ import (
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/accesslog"
"github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -145,21 +147,36 @@ func (ms *measureService) Write(measure
measurev1.MeasureService_WriteServer) er
var emptyMeasureQueryResponse = &measurev1.QueryResponse{DataPoints:
make([]*measurev1.DataPoint, 0)}
-func (ms *measureService) Query(_ context.Context, req
*measurev1.QueryRequest) (*measurev1.QueryResponse, error) {
- if err := timestamp.CheckTimeRange(req.GetTimeRange()); err != nil {
+func (ms *measureService) Query(_ context.Context, req
*measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) {
+ if err = timestamp.CheckTimeRange(req.GetTimeRange()); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%v is invalid
:%s", req.GetTimeRange(), err)
}
- message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), req)
- feat, errQuery := ms.broadcaster.Publish(data.TopicMeasureQuery,
message)
- if errQuery != nil {
- return nil, errQuery
+ now := time.Now()
+ if req.Trace {
+ ctx := context.TODO()
+ tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
+ span, _ := tracer.StartSpan(ctx, "measure-grpc")
+ span.Tag("request", convert.BytesToString(logger.Proto(req)))
+ defer func() {
+ if err != nil {
+ span.Error(err)
+ } else {
+ span.AddSubTrace(resp.Trace)
+ resp.Trace = tracer.ToProto()
+ }
+ span.Stop()
+ }()
}
- msg, errFeat := feat.Get()
- if errFeat != nil {
- if errors.Is(errFeat, io.EOF) {
+ feat, err := ms.broadcaster.Publish(data.TopicMeasureQuery,
bus.NewMessage(bus.MessageID(now.UnixNano()), req))
+ if err != nil {
+ return nil, err
+ }
+ msg, err := feat.Get()
+ if err != nil {
+ if errors.Is(err, io.EOF) {
return emptyMeasureQueryResponse, nil
}
- return nil, errFeat
+ return nil, err
}
data := msg.Data()
switch d := data.(type) {
diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go
index 8ba0118a..7fce11d1 100644
--- a/banyand/measure/introducer.go
+++ b/banyand/measure/introducer.go
@@ -40,11 +40,12 @@ func generateIntroduction() *introduction {
if v == nil {
return &introduction{}
}
- return v.(*introduction)
+ i := v.(*introduction)
+ i.reset()
+ return i
}
func releaseIntroduction(i *introduction) {
- i.reset()
introductionPool.Put(i)
}
@@ -69,11 +70,12 @@ func generateFlusherIntroduction() *flusherIntroduction {
flushed: make(map[uint64]*partWrapper),
}
}
- return v.(*flusherIntroduction)
+ i := v.(*flusherIntroduction)
+ i.reset()
+ return i
}
func releaseFlusherIntroduction(i *flusherIntroduction) {
- i.reset()
flusherIntroductionPool.Put(i)
}
@@ -100,11 +102,12 @@ func generateMergerIntroduction() *mergerIntroduction {
if v == nil {
return &mergerIntroduction{}
}
- return v.(*mergerIntroduction)
+ i := v.(*mergerIntroduction)
+ i.reset()
+ return i
}
func releaseMergerIntroduction(i *mergerIntroduction) {
- i.reset()
mergerIntroductionPool.Put(i)
}
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 168a5663..c153a301 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -63,17 +63,16 @@ type queryOptions struct {
maxTimestamp int64
}
-func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions)
(pbv1.MeasureQueryResult, error) {
+func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions)
(mqr pbv1.MeasureQueryResult, err error) {
if mqo.TimeRange == nil || len(mqo.Entities) < 1 {
return nil, errors.New("invalid query options: timeRange and
series are required")
}
if len(mqo.TagProjection) == 0 && len(mqo.FieldProjection) == 0 {
return nil, errors.New("invalid query options: tagProjection or
fieldProjection is required")
}
- var result queryResult
db := s.databaseSupplier.SupplyTSDB()
if db == nil {
- return &result, nil
+ return mqr, nil
}
tsdb := db.(storage.TSDB[*tsTable, option])
tabWrappers := tsdb.SelectTSTables(*mqo.TimeRange)
@@ -95,7 +94,7 @@ func (s *measure) Query(ctx context.Context, mqo
pbv1.MeasureQueryOptions) (pbv1
return nil, err
}
if len(sl) < 1 {
- return &result, nil
+ return mqr, nil
}
var sids []common.SeriesID
for i := range sl {
@@ -108,6 +107,7 @@ func (s *measure) Query(ctx context.Context, mqo
pbv1.MeasureQueryOptions) (pbv1
maxTimestamp: mqo.TimeRange.End.UnixNano(),
}
var n int
+ var result queryResult
for i := range tabWrappers {
s := tabWrappers[i].Table().currentSnapshot()
if s == nil {
@@ -120,48 +120,58 @@ func (s *measure) Query(ctx context.Context, mqo
pbv1.MeasureQueryOptions) (pbv1
}
result.snapshots = append(result.snapshots, s)
}
- bma := generateBlockMetadataArray()
- defer releaseBlockMetadataArray(bma)
- // TODO: cache tstIter
- var tstIter tstIter
- defer tstIter.reset()
- originalSids := make([]common.SeriesID, len(sids))
- copy(originalSids, sids)
- sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] })
- tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
- if tstIter.Error() != nil {
- return nil, fmt.Errorf("cannot init tstIter: %w",
tstIter.Error())
- }
- projectedEntityOffsets, tagProjectionOnPart := s.parseTagProjection(qo,
&result)
- result.tagProjection = qo.TagProjection
- qo.TagProjection = tagProjectionOnPart
- for tstIter.nextBlock() {
- bc := generateBlockCursor()
- p := tstIter.piHeap[0]
-
- seriesID := p.curBlock.seriesID
- if result.entityValues != nil && result.entityValues[seriesID]
== nil {
- for i := range sl {
- if sl[i].ID == seriesID {
- tag :=
make(map[string]*modelv1.TagValue)
- for name, offset := range
projectedEntityOffsets {
- tag[name] =
sl[i].EntityValues[offset]
+
+ func() {
+ bma := generateBlockMetadataArray()
+ defer releaseBlockMetadataArray(bma)
+ defFn := startBlockScanSpan(ctx, len(sids), parts, &result)
+ defer defFn()
+ // TODO: cache tstIter
+ var tstIter tstIter
+ defer tstIter.reset()
+ originalSids := make([]common.SeriesID, len(sids))
+ copy(originalSids, sids)
+ sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j]
})
+ tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
+ if tstIter.Error() != nil {
+ err = fmt.Errorf("cannot init tstIter: %w",
tstIter.Error())
+ return
+ }
+ projectedEntityOffsets, tagProjectionOnPart :=
s.parseTagProjection(qo, &result)
+ result.tagProjection = qo.TagProjection
+ qo.TagProjection = tagProjectionOnPart
+
+ for tstIter.nextBlock() {
+ bc := generateBlockCursor()
+ p := tstIter.piHeap[0]
+
+ seriesID := p.curBlock.seriesID
+ if result.entityValues != nil &&
result.entityValues[seriesID] == nil {
+ for i := range sl {
+ if sl[i].ID == seriesID {
+ tag :=
make(map[string]*modelv1.TagValue)
+ for name, offset := range
projectedEntityOffsets {
+ tag[name] =
sl[i].EntityValues[offset]
+ }
+ result.entityValues[seriesID] =
tag
}
- result.entityValues[seriesID] = tag
}
}
+ bc.init(p.p, p.curBlock, qo)
+ result.data = append(result.data, bc)
}
- bc.init(p.p, p.curBlock, qo)
- result.data = append(result.data, bc)
- }
- if tstIter.Error() != nil {
- return nil, fmt.Errorf("cannot iterate tstIter: %w",
tstIter.Error())
+ if tstIter.Error() != nil {
+ err = fmt.Errorf("cannot iterate tstIter: %w",
tstIter.Error())
+ }
+ result.sidToIndex = make(map[common.SeriesID]int)
+ for i, si := range originalSids {
+ result.sidToIndex[si] = i
+ }
+ }()
+ if err != nil {
+ return nil, err
}
- result.sidToIndex = make(map[common.SeriesID]int)
- for i, si := range originalSids {
- result.sidToIndex[si] = i
- }
if mqo.Order == nil {
result.ascTS = true
} else if mqo.Order.Sort == modelv1.Sort_SORT_ASC || mqo.Order.Sort ==
modelv1.Sort_SORT_UNSPECIFIED {
diff --git a/banyand/measure/trace.go b/banyand/measure/trace.go
new file mode 100644
index 00000000..ccf51f3a
--- /dev/null
+++ b/banyand/measure/trace.go
@@ -0,0 +1,74 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/dustin/go-humanize"
+
+ "github.com/apache/skywalking-banyandb/pkg/query"
+)
+
+const (
+ partMetadataHeader = "MinTimestamp, MaxTimestamp, CompressionSize,
UncompressedSize, TotalCount, BlocksCount"
+ blockHeader = "PartID, SeriesID, MinTimestamp, MaxTimestamp,
Count, UncompressedSize"
+)
+
+func (pm *partMetadata) String() string {
+ minTimestamp := time.Unix(0, pm.MinTimestamp).Format(time.Stamp)
+ maxTimestamp := time.Unix(0, pm.MaxTimestamp).Format(time.Stamp)
+
+ return fmt.Sprintf("%s, %s, %s, %s, %s, %s",
+ minTimestamp, maxTimestamp,
humanize.Bytes(pm.CompressedSizeBytes),
+ humanize.Bytes(pm.UncompressedSizeBytes),
humanize.Comma(int64(pm.TotalCount)),
+ humanize.Comma(int64(pm.BlocksCount)))
+}
+
+func (bc *blockCursor) String() string {
+ minTimestamp := time.Unix(0, bc.minTimestamp).Format(time.Stamp)
+ maxTimestamp := time.Unix(0, bc.maxTimestamp).Format(time.Stamp)
+
+ return fmt.Sprintf("%d, %d, %s, %s, %d, %s",
+ bc.p.partMetadata.ID, bc.bm.seriesID, minTimestamp,
maxTimestamp, bc.bm.count, humanize.Bytes(bc.bm.uncompressedSizeBytes))
+}
+
+func startBlockScanSpan(ctx context.Context, sids int, parts []*part, qr
*queryResult) func() {
+ tracer := query.GetTracer(ctx)
+ if tracer == nil {
+ return func() {}
+ }
+
+ span, _ := tracer.StartSpan(ctx, "scan-blocks")
+ span.Tag("series_num", fmt.Sprintf("%d", sids))
+ span.Tag("part_header", partMetadataHeader)
+ for i := range parts {
+ span.Tag(fmt.Sprintf("part_%d_%s", parts[i].partMetadata.ID,
parts[i].path),
+ parts[i].partMetadata.String())
+ }
+
+ return func() {
+ span.Tag("block_header", blockHeader)
+ for i := range qr.data {
+ span.Tag(fmt.Sprintf("block_%d", i),
qr.data[i].String())
+ }
+ span.Stop()
+ }
+}
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index c60ef7d0..61ba8164 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -150,8 +150,10 @@ func (s *clientService) Serve() run.StopNotify {
func (s *clientService) GracefulStop() {
s.closer.Done()
s.closer.CloseThenWait()
- if err := s.schemaRegistry.Close(); err != nil {
- logger.GetLogger(s.Name()).Error().Err(err).Msg("failed to
close schema registry")
+ if s.schemaRegistry != nil {
+ if err := s.schemaRegistry.Close(); err != nil {
+ logger.GetLogger(s.Name()).Error().Err(err).Msg("failed
to close schema registry")
+ }
}
}
diff --git a/banyand/metadata/embeddedserver/server.go
b/banyand/metadata/embeddedserver/server.go
index 3fbe9f6c..44bd4864 100644
--- a/banyand/metadata/embeddedserver/server.go
+++ b/banyand/metadata/embeddedserver/server.go
@@ -86,8 +86,10 @@ func (s *server) Serve() run.StopNotify {
func (s *server) GracefulStop() {
s.Service.GracefulStop()
- s.metaServer.Close()
- <-s.metaServer.StopNotify()
+ if s.metaServer != nil {
+ s.metaServer.Close()
+ <-s.metaServer.StopNotify()
+ }
}
// NewService returns a new metadata repository Service.
diff --git a/banyand/metadata/metadata_test.go
b/banyand/metadata/metadata_test.go
index 09a5bbaf..981ab416 100644
--- a/banyand/metadata/metadata_test.go
+++ b/banyand/metadata/metadata_test.go
@@ -22,6 +22,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"github.com/apache/skywalking-banyandb/api/common"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -39,29 +40,30 @@ func Test_service_RulesBySubject(t *testing.T) {
subject *commonv1.Metadata
}
is := assert.New(t)
+ req := require.New(t)
is.NoError(logger.Init(logger.Logging{
Env: "dev",
Level: flags.LogLevel,
}))
ctx := context.TODO()
s, _ := embeddedserver.NewService(ctx)
- is.NotNil(s)
+ req.NotNil(s)
rootDir, deferFn, err := testhelper.NewSpace()
- is.NoError(err)
+ req.NoError(err)
err = s.FlagSet().Parse([]string{"--metadata-root-path=" + rootDir})
- is.NoError(err)
- is.NoError(s.Validate())
+ req.NoError(err)
+ req.NoError(s.Validate())
ctx = context.WithValue(ctx, common.ContextNodeKey, common.Node{NodeID:
"test"})
ctx = context.WithValue(ctx, common.ContextNodeRolesKey,
[]databasev1.Role{databasev1.Role_ROLE_META})
err = s.PreRun(ctx)
- is.NoError(err)
+ req.NoError(err)
defer func() {
s.GracefulStop()
deferFn()
}()
err = test.PreloadSchema(ctx, s.SchemaRegistry())
- is.NoError(err)
+ req.NoError(err)
tests := []struct {
name string
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index ccb039e9..5a361525 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -19,6 +19,9 @@ package query
import (
"context"
+ "errors"
+ "fmt"
+ "runtime/debug"
"time"
"go.uber.org/multierr"
@@ -34,6 +37,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
logical_measure
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
logical_stream
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
@@ -52,13 +56,13 @@ var (
)
type queryService struct {
- log *logger.Logger
- // TODO: remove the metaService once
https://github.com/apache/skywalking/issues/10121 is fixed.
metaService metadata.Repo
pipeline queue.Server
+ log *logger.Logger
sqp *streamQueryProcessor
mqp *measureQueryProcessor
tqp *topNQueryProcessor
+ nodeID string
}
type streamQueryProcessor struct {
@@ -76,6 +80,12 @@ func (p *streamQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
if p.log.Debug().Enabled() {
p.log.Debug().RawJSON("criteria",
logger.Proto(queryCriteria)).Msg("received a query request")
}
+ defer func() {
+ if err := recover(); err != nil {
+ p.log.Error().Interface("err", err).RawJSON("req",
logger.Proto(queryCriteria)).Str("stack", string(debug.Stack())).Msg("panic")
+ resp =
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic"))
+ }
+ }()
// TODO: support multiple groups
if len(queryCriteria.Groups) > 1 {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("only
support one group in the query request"))
@@ -124,11 +134,18 @@ type measureQueryProcessor struct {
func (p *measureQueryProcessor) Rev(message bus.Message) (resp bus.Message) {
queryCriteria, ok := message.Data().(*measurev1.QueryRequest)
- now := time.Now().UnixNano()
+ n := time.Now()
+ now := n.UnixNano()
if !ok {
resp = bus.NewMessage(bus.MessageID(now),
common.NewError("invalid event data type"))
return
}
+ defer func() {
+ if err := recover(); err != nil {
+ p.log.Error().Interface("err", err).RawJSON("req",
logger.Proto(queryCriteria)).Str("stack", string(debug.Stack())).Msg("panic")
+ resp =
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic"))
+ }
+ }()
// TODO: support multiple groups
if len(queryCriteria.Groups) > 1 {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("only
support one group in the query request"))
@@ -160,12 +177,33 @@ func (p *measureQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to analyze the query request for measure %s: %v", meta.GetName(), err))
return
}
+ ctx := context.Background()
+ var tracer *query.Tracer
+ var span *query.Span
+ if queryCriteria.Trace {
+ tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
+ span, ctx = tracer.StartSpan(ctx, "data-%s",
p.queryService.nodeID)
+ span.Tag("plan", plan.String())
+ defer func() {
+ data := resp.Data()
+ switch d := data.(type) {
+ case *measurev1.QueryResponse:
+ d.Trace = tracer.ToProto()
+ case common.Error:
+ span.Error(errors.New(d.Msg()))
+ resp = bus.NewMessage(bus.MessageID(now),
&measurev1.QueryResponse{Trace: tracer.ToProto()})
+ default:
+ panic("unexpected data type")
+ }
+ span.Stop()
+ }()
+ }
if e := ml.Debug(); e.Enabled() {
e.Str("plan", plan.String()).Msg("query plan")
}
- mIterator, err :=
plan.(executor.MeasureExecutable).Execute(executor.WithMeasureExecutionContext(context.Background(),
ec))
+ mIterator, err :=
plan.(executor.MeasureExecutable).Execute(executor.WithMeasureExecutionContext(ctx,
ec))
if err != nil {
ml.Error().Err(err).RawJSON("req",
logger.Proto(queryCriteria)).Msg("fail to close the query plan")
resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail
to execute the query plan for measure %s: %v", meta.GetName(), err))
@@ -174,19 +212,36 @@ func (p *measureQueryProcessor) Rev(message bus.Message)
(resp bus.Message) {
defer func() {
if err = mIterator.Close(); err != nil {
ml.Error().Err(err).RawJSON("req",
logger.Proto(queryCriteria)).Msg("fail to close the query plan")
+ if span != nil {
+ span.Error(fmt.Errorf("fail to close the query
plan: %w", err))
+ }
}
}()
+
result := make([]*measurev1.DataPoint, 0)
- for mIterator.Next() {
- current := mIterator.Current()
- if len(current) > 0 {
- result = append(result, current[0])
+ func() {
+ var r int
+ if tracer != nil {
+ iterSpan, _ := tracer.StartSpan(ctx, "iterator")
+ defer func() {
+ iterSpan.Tag("rounds", fmt.Sprintf("%d", r))
+ iterSpan.Tag("size", fmt.Sprintf("%d",
len(result)))
+ iterSpan.Stop()
+ }()
}
- }
+ for mIterator.Next() {
+ r++
+ current := mIterator.Current()
+ if len(current) > 0 {
+ result = append(result, current[0])
+ }
+ }
+ }()
+ qr := &measurev1.QueryResponse{DataPoints: result}
if e := ml.Debug(); e.Enabled() {
- e.RawJSON("ret",
logger.Proto(&measurev1.QueryResponse{DataPoints: result})).Msg("got a measure")
+ e.RawJSON("ret", logger.Proto(qr)).Msg("got a measure")
}
- resp = bus.NewMessage(bus.MessageID(now),
&measurev1.QueryResponse{DataPoints: result})
+ resp = bus.NewMessage(bus.MessageID(now), qr)
return
}
@@ -194,7 +249,13 @@ func (q *queryService) Name() string {
return moduleName
}
-func (q *queryService) PreRun(_ context.Context) error {
+func (q *queryService) PreRun(ctx context.Context) error {
+ val := ctx.Value(common.ContextNodeKey)
+ if val == nil {
+ return errors.New("node id is empty")
+ }
+ node := val.(common.Node)
+ q.nodeID = node.NodeID
q.log = logger.GetLogger(moduleName)
return multierr.Combine(
q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp),
diff --git a/banyand/stream/introducer.go b/banyand/stream/introducer.go
index cd215263..76c6e659 100644
--- a/banyand/stream/introducer.go
+++ b/banyand/stream/introducer.go
@@ -40,11 +40,12 @@ func generateIntroduction() *introduction {
if v == nil {
return &introduction{}
}
- return v.(*introduction)
+ intro := v.(*introduction)
+ intro.reset()
+ return intro
}
func releaseIntroduction(i *introduction) {
- i.reset()
introductionPool.Put(i)
}
@@ -69,11 +70,12 @@ func generateFlusherIntroduction() *flusherIntroduction {
flushed: make(map[uint64]*partWrapper),
}
}
- return v.(*flusherIntroduction)
+ fi := v.(*flusherIntroduction)
+ fi.reset()
+ return fi
}
func releaseFlusherIntroduction(i *flusherIntroduction) {
- i.reset()
flusherIntroductionPool.Put(i)
}
@@ -100,11 +102,12 @@ func generateMergerIntroduction() *mergerIntroduction {
if v == nil {
return &mergerIntroduction{}
}
- return v.(*mergerIntroduction)
+ mi := v.(*mergerIntroduction)
+ mi.reset()
+ return mi
}
func releaseMergerIntroduction(i *mergerIntroduction) {
- i.reset()
mergerIntroductionPool.Put(i)
}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 2c2522f1..c4107a80 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -416,6 +416,7 @@ Span is the basic unit of a trace.
| tags | [Tag](#banyandb-common-v1-Tag) | repeated | tags is a list of tags of
the span. |
| message | [string](#string) | | message is the message generated by the
span. |
| children | [Span](#banyandb-common-v1-Span) | repeated | children is a list
of child spans of the span. |
+| duration | [int64](#int64) | | duration is the duration of the span. |
diff --git a/pkg/query/doc.go b/pkg/query/doc.go
new file mode 100644
index 00000000..1c2d3bf7
--- /dev/null
+++ b/pkg/query/doc.go
@@ -0,0 +1,19 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package query provides the query common interfaces and utilities.
+package query
diff --git a/pkg/query/logical/index_filter.go
b/pkg/query/logical/index_filter.go
index ceafa695..8450c94e 100644
--- a/pkg/query/logical/index_filter.go
+++ b/pkg/query/logical/index_filter.go
@@ -141,28 +141,44 @@ func parseCondition(cond *modelv1.Condition, indexRule
*databasev1.IndexRule, ex
return newNot(indexRule, newEq(indexRule, expr)),
[][]*modelv1.TagValue{entity}, nil
case modelv1.Condition_BINARY_OP_HAVING:
bb := expr.Bytes()
- and := newAnd(len(bb))
+ l := len(bb)
+ if l < 1 {
+ return ENode, [][]*modelv1.TagValue{entity}, nil
+ }
+ and := newAnd(l)
for _, b := range bb {
and.append(newEq(indexRule, newBytesLiteral(b)))
}
return and, [][]*modelv1.TagValue{entity}, nil
case modelv1.Condition_BINARY_OP_NOT_HAVING:
bb := expr.Bytes()
- and := newAnd(len(bb))
+ l := len(bb)
+ if l < 1 {
+ return ENode, [][]*modelv1.TagValue{entity}, nil
+ }
+ and := newAnd(l)
for _, b := range bb {
and.append(newEq(indexRule, newBytesLiteral(b)))
}
return newNot(indexRule, and), [][]*modelv1.TagValue{entity},
nil
case modelv1.Condition_BINARY_OP_IN:
bb := expr.Bytes()
- or := newOr(len(bb))
+ l := len(bb)
+ if l < 1 {
+ return ENode, [][]*modelv1.TagValue{entity}, nil
+ }
+ or := newOr(l)
for _, b := range bb {
or.append(newEq(indexRule, newBytesLiteral(b)))
}
return or, [][]*modelv1.TagValue{entity}, nil
case modelv1.Condition_BINARY_OP_NOT_IN:
bb := expr.Bytes()
- or := newOr(len(bb))
+ l := len(bb)
+ if l < 1 {
+ return ENode, [][]*modelv1.TagValue{entity}, nil
+ }
+ or := newOr(l)
for _, b := range bb {
or.append(newEq(indexRule, newBytesLiteral(b)))
}
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go
b/pkg/query/logical/measure/measure_plan_distributed.go
index 31226984..bd4377a3 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -33,7 +33,9 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/iter/sort"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
)
@@ -155,29 +157,46 @@ type distributedPlan struct {
maxDataPointsSize uint32
}
-func (t *distributedPlan) Execute(ctx context.Context) (executor.MIterator,
error) {
+func (t *distributedPlan) Execute(ctx context.Context) (mi executor.MIterator,
err error) {
dctx := executor.FromDistributedExecutionContext(ctx)
- query := proto.Clone(t.queryTemplate).(*measurev1.QueryRequest)
- query.TimeRange = dctx.TimeRange()
+ queryRequest := proto.Clone(t.queryTemplate).(*measurev1.QueryRequest)
+ queryRequest.TimeRange = dctx.TimeRange()
if t.maxDataPointsSize > 0 {
- query.Limit = t.maxDataPointsSize
+ queryRequest.Limit = t.maxDataPointsSize
}
- var allErr error
- ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery,
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), query))
+ tracer := query.GetTracer(ctx)
+ var span *query.Span
+ if tracer != nil {
+ span, _ = tracer.StartSpan(ctx, "distributed-client")
+ queryRequest.Trace = true
+ span.Tag("request",
convert.BytesToString(logger.Proto(queryRequest)))
+ defer func() {
+ if err != nil {
+ span.Error(err)
+ } else {
+ span.Stop()
+ }
+ }()
+ }
+ ff, err := dctx.Broadcast(defaultQueryTimeout, data.TopicMeasureQuery,
bus.NewMessage(bus.MessageID(dctx.TimeRange().Begin.Nanos), queryRequest))
if err != nil {
return nil, err
}
var see []sort.Iterator[*comparableDataPoint]
for _, f := range ff {
if m, getErr := f.Get(); getErr != nil {
- allErr = multierr.Append(allErr, getErr)
+ err = multierr.Append(err, getErr)
} else {
d := m.Data()
if d == nil {
continue
}
+ resp := d.(*measurev1.QueryResponse)
+ if span != nil {
+ span.AddSubTrace(resp.Trace)
+ }
see = append(see,
-
newSortableElements(d.(*measurev1.QueryResponse).DataPoints,
+ newSortableElements(resp.DataPoints,
t.sortByTime, t.sortTagSpec))
}
}
@@ -185,7 +204,7 @@ func (t *distributedPlan) Execute(ctx context.Context)
(executor.MIterator, erro
Iterator: sort.NewItemIter(see, t.desc),
}
smi.init()
- return smi, allErr
+ return smi, err
}
func (t *distributedPlan) String() string {
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go
b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index c5de0da2..2e8d7304 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/query/logical"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -153,6 +154,8 @@ func (i *localIndexScan) Execute(ctx context.Context) (mit
executor.MIterator, e
orderByType = pbv1.OrderByTypeSeries
}
ec := executor.FromMeasureExecutionContext(ctx)
+ ctx, stop := i.startSpan(ctx, query.GetTracer(ctx), orderByType,
orderBy)
+ defer stop(err)
result, err := ec.Query(ctx, pbv1.MeasureQueryOptions{
Name: i.metadata.GetName(),
TimeRange: &i.timeRange,
@@ -209,6 +212,9 @@ type resultMIterator struct {
}
func (ei *resultMIterator) Next() bool {
+ if ei.result == nil {
+ return false
+ }
ei.i++
if ei.i < len(ei.current) {
return true
@@ -256,6 +262,33 @@ func (ei *resultMIterator) Current()
[]*measurev1.DataPoint {
}
func (ei *resultMIterator) Close() error {
- ei.result.Release()
+ if ei.result != nil {
+ ei.result.Release()
+ }
return nil
}
+
+func (i *localIndexScan) startSpan(ctx context.Context, tracer *query.Tracer,
orderType pbv1.OrderByType, orderBy *pbv1.OrderBy) (context.Context,
func(error)) {
+ if tracer == nil {
+ return ctx, func(error) {}
+ }
+
+ span, ctx := tracer.StartSpan(ctx, "indexScan-%s", i.metadata)
+ sortName := modelv1.Sort_name[int32(orderBy.Sort)]
+ switch orderType {
+ case pbv1.OrderByTypeTime:
+ span.Tag("orderBy", "time "+sortName)
+ case pbv1.OrderByTypeIndex:
+ span.Tag("orderBy", fmt.Sprintf("indexRule:%s",
orderBy.Index.Metadata.Name))
+ case pbv1.OrderByTypeSeries:
+ span.Tag("orderBy", "series")
+ }
+ span.Tag("details", i.String())
+
+ return ctx, func(err error) {
+ if err != nil {
+ span.Error(err)
+ }
+ span.Stop()
+ }
+}
diff --git a/pkg/query/tracer.go b/pkg/query/tracer.go
new file mode 100644
index 00000000..4d761273
--- /dev/null
+++ b/pkg/query/tracer.go
@@ -0,0 +1,155 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package query
+
+import (
+ "context"
+ "fmt"
+
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+)
+
+var (
+ spanKey = spanContextKey{}
+ tracerKey = tracerContextKey{}
+)
+
+type (
+ spanContextKey struct{}
+ tracerContextKey struct{}
+)
+
+// Tracer is a simple tracer for query.
+type Tracer struct {
+ data *commonv1.Trace
+}
+
+// NewTracer creates a new tracer.
+func NewTracer(ctx context.Context, id string) (*Tracer, context.Context) {
+ tracer := GetTracer(ctx)
+ if tracer != nil {
+ return tracer, ctx
+ }
+ t := &Tracer{
+ data: &commonv1.Trace{
+ TraceId: id,
+ },
+ }
+ return t, context.WithValue(ctx, tracerKey, t)
+}
+
+// GetTracer returns the tracer from the context.
+func GetTracer(ctx context.Context) *Tracer {
+ tv := ctx.Value(tracerKey)
+ if tv == nil {
+ return nil
+ }
+ tracer, ok := ctx.Value(tracerKey).(*Tracer)
+ if ok {
+ return tracer
+ }
+ panic(fmt.Errorf("invalid tracer context value: %v", tv))
+}
+
+// StartSpan starts a new span.
+func (t *Tracer) StartSpan(ctx context.Context, format string, args
...interface{}) (*Span, context.Context) {
+ s := &Span{
+ data: &commonv1.Span{
+ Message: fmt.Sprintf(format, args...),
+ StartTime: timestamppb.Now(),
+ },
+ tracer: t,
+ }
+ sv := ctx.Value(spanKey)
+ if sv == nil {
+ t.data.Spans = append(t.data.Spans, s.data)
+ return s, context.WithValue(ctx, spanKey, s)
+ }
+ parentSpan, ok := ctx.Value(spanKey).(*Span)
+ if ok {
+ parentSpan.addChild(s.data)
+ } else {
+ t.data.Spans = append(t.data.Spans, s.data)
+ }
+ return s, context.WithValue(ctx, spanKey, s)
+}
+
+// ToProto returns the proto representation of the tracer.
+func (t *Tracer) ToProto() *commonv1.Trace {
+ return t.data
+}
+
+// Span is a span of the tracer.
+type Span struct {
+ data *commonv1.Span
+ tracer *Tracer
+}
+
+func (s *Span) addChild(child *commonv1.Span) {
+ s.data.Children = append(s.data.Children, child)
+ if child.Error {
+ s.Error(fmt.Errorf("sub span error"))
+ }
+}
+
+// AddSubTrace adds a sub trace to the span.
+func (s *Span) AddSubTrace(trace *commonv1.Trace) {
+ if trace == nil {
+ return
+ }
+ for i := range trace.Spans {
+ s.addChild(trace.Spans[i])
+ }
+}
+
+// Tag adds a tag to the span.
+func (s *Span) Tag(key, value string) *Span {
+ s.data.Tags = append(s.data.Tags, &commonv1.Tag{
+ Key: key,
+ Value: value,
+ })
+ return s
+}
+
+// Tagf adds a formatted tag to the span.
+func (s *Span) Tagf(key, format string, args ...any) *Span {
+ s.data.Tags = append(s.data.Tags, &commonv1.Tag{
+ Key: key,
+ Value: fmt.Sprintf(format, args...),
+ })
+ return s
+}
+
+// Error marks the span as an error span.
+func (s *Span) Error(err error) *Span {
+ if s.data.Error {
+ return s
+ }
+ s.data.Error = true
+ s.Tag("error_msg", err.Error())
+ s.tracer.data.Error = true
+ return s
+}
+
+// Stop stops the span.
+func (s *Span) Stop() {
+ s.data.EndTime = timestamppb.Now()
+ s.data.Duration =
s.data.EndTime.AsTime().Sub(s.data.StartTime.AsTime()).Milliseconds()
+}
diff --git a/pkg/query/tracer_test.go b/pkg/query/tracer_test.go
new file mode 100644
index 00000000..cfa4c666
--- /dev/null
+++ b/pkg/query/tracer_test.go
@@ -0,0 +1,131 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package query
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+)
+
+func TestNewTracer(t *testing.T) {
+ ctx := context.Background()
+ tracer, newCtx := NewTracer(ctx, "test-trace-id")
+ assert.NotNil(t, tracer)
+ assert.NotEqual(t, ctx, newCtx, "context should be different after
adding a tracer")
+}
+
+func TestGetTracer(t *testing.T) {
+ ctx := context.Background()
+ var tracer *Tracer
+ tracer, ctx = NewTracer(ctx, "test-trace-id")
+
+ retrievedTracer := GetTracer(ctx)
+ assert.Equal(t, tracer, retrievedTracer, "retrieved tracer should be
the same as the original")
+}
+
+func TestStartSpan(t *testing.T) {
+ ctx := context.Background()
+ var tracer *Tracer
+ tracer, ctx = NewTracer(ctx, "test-trace-id")
+ span, spanCtx := tracer.StartSpan(ctx, "test span %s", "1")
+
+ assert.NotNil(t, span)
+ assert.NotEqual(t, ctx, spanCtx, "context should be different after
starting a span")
+ assert.Equal(t, "test span 1", span.data.Message)
+ assert.NotNil(t, span.data.StartTime)
+}
+
+func TestSpan_AddChild(t *testing.T) {
+ ctx := context.Background()
+ var tracer *Tracer
+ tracer, ctx = NewTracer(ctx, "test-trace-id")
+ var parentSpan *Span
+ parentSpan, ctx = tracer.StartSpan(ctx, "parent span")
+ childSpan, _ := tracer.StartSpan(ctx, "child span")
+
+ assert.Contains(t, parentSpan.data.Children, childSpan.data, "parent
span should contain the child span")
+}
+
+func TestSpan_AddSubTrace(t *testing.T) {
+ ctx := context.Background()
+ var tracer *Tracer
+ tracer, ctx = NewTracer(ctx, "test-trace-id")
+ span, _ := tracer.StartSpan(ctx, "span")
+
+ subTrace := &commonv1.Trace{
+ Spans: []*commonv1.Span{
+ {Message: "sub span 1"},
+ {Message: "sub span 2"},
+ },
+ }
+
+ span.AddSubTrace(subTrace)
+
+ assert.Equal(t, 2, len(span.data.Children), "span should contain two
children")
+ assert.Equal(t, "sub span 1", span.data.Children[0].Message)
+ assert.Equal(t, "sub span 2", span.data.Children[1].Message)
+}
+
+func TestSpan_Tag(t *testing.T) {
+ ctx := context.Background()
+ var tracer *Tracer
+ tracer, ctx = NewTracer(ctx, "test-trace-id")
+ span, _ := tracer.StartSpan(ctx, "span")
+
+ span.Tag("key", "value")
+
+ assert.Equal(t, 1, len(span.data.Tags), "span should have one tag")
+ assert.Equal(t, "key", span.data.Tags[0].Key)
+ assert.Equal(t, "value", span.data.Tags[0].Value)
+
+ span.Tagf("key", "value %s", "formatted")
+ assert.Equal(t, 2, len(span.data.Tags), "span should have two tags")
+ assert.Equal(t, "key", span.data.Tags[1].Key)
+ assert.Equal(t, "value formatted", span.data.Tags[1].Value)
+}
+
+func TestSpan_Error(t *testing.T) {
+ ctx := context.Background()
+ var tracer *Tracer
+ tracer, ctx = NewTracer(ctx, "test-trace-id")
+ span, _ := tracer.StartSpan(ctx, "span")
+
+ span.Error(fmt.Errorf("test error"))
+
+ assert.True(t, span.data.Error, "span should be marked as error")
+ assert.True(t, tracer.data.Error, "tracer should be marked as error")
+ assert.Equal(t, "test error", span.data.Tags[0].Value, "error message
should be added as a tag")
+}
+
+func TestSpan_Stop(t *testing.T) {
+ ctx := context.Background()
+ tracer, _ := NewTracer(ctx, "test-trace-id")
+ span, _ := tracer.StartSpan(ctx, "span")
+ time.Sleep(10 * time.Millisecond)
+
+ span.Stop()
+
+ assert.NotNil(t, span.data.EndTime, "span end time should be set")
+ assert.Greater(t, span.data.Duration, int64(0), "span duration should
be greater than 0")
+}
diff --git a/pkg/test/stream/etcd.go b/pkg/test/stream/etcd.go
index b796ef01..eb9d342b 100644
--- a/pkg/test/stream/etcd.go
+++ b/pkg/test/stream/etcd.go
@@ -45,6 +45,9 @@ var (
// PreloadSchema loads schemas from files in the booting process.
func PreloadSchema(ctx context.Context, e schema.Registry) error {
+ if e == nil {
+ return nil
+ }
g := &commonv1.Group{}
if err := protojson.Unmarshal([]byte(groupJSON), g); err != nil {
return err
diff --git a/test/stress/trace/docker-compose-cluster.yaml
b/test/stress/trace/docker-compose-cluster.yaml
index 48b49e7e..ddf14221 100644
--- a/test/stress/trace/docker-compose-cluster.yaml
+++ b/test/stress/trace/docker-compose-cluster.yaml
@@ -76,6 +76,10 @@ services:
build:
dockerfile: ./docker/Dockerfile
context: ../../..
+ ports:
+ - 17913:17913
+ - 6060:6060
+ - 2121:2121
deploy:
resources:
limits:
@@ -89,7 +93,7 @@ services:
file: ../../docker/base-compose.yml
service: oap
# TODO: use the main repo image once v0.6.0 is released and merged into
the main repo
- image: "ghcr.io/apache/skywalking/data-generator:${SW_OAP_COMMIT}"
+ image: "hanahmily/data-generator:${SW_OAP_COMMIT}"
environment:
SW_STORAGE: banyandb
SW_STORAGE_BANYANDB_TARGETS: "liaison:17912"