This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug/empty-trace in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 0e638635749b5da5432871ecc59beeb34590d51f Author: Gao Hongtao <[email protected]> AuthorDate: Tue Nov 18 11:38:38 2025 +0000 Enhance tracing in query processing by adding response count metrics across various services, improving observability of query results and performance analysis. --- banyand/dquery/topn.go | 11 ++++++++- banyand/liaison/grpc/measure.go | 28 +++++++++++++++------- banyand/liaison/grpc/stream.go | 9 +++++-- banyand/liaison/grpc/trace.go | 9 +++++-- banyand/query/processor.go | 3 +++ .../logical/measure/measure_plan_distributed.go | 9 +++++++ .../logical/stream/stream_plan_distributed.go | 6 +++++ pkg/query/logical/trace/trace_plan_distributed.go | 8 +++++++ test/cases/trace/trace.go | 2 +- 9 files changed, 70 insertions(+), 15 deletions(-) diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go index 80f59d34..d96b78c2 100644 --- a/banyand/dquery/topn.go +++ b/banyand/dquery/topn.go @@ -87,10 +87,11 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp resp = bus.NewMessage(now, common.NewError("no stage found")) return } + var span *pkgquery.Span if request.Trace { var tracer *pkgquery.Tracer tracer, ctx = pkgquery.NewTracer(ctx, n.Format(time.RFC3339Nano)) - span, _ := tracer.StartSpan(ctx, "distributed-client") + span, _ = tracer.StartSpan(ctx, "distributed-client") span.Tag("request", convert.BytesToString(logger.Proto(request))) span.Tagf("nodeSelectors", "%v", nodeSelectors) defer func() { @@ -118,6 +119,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp aggregator := query.CreateTopNPostAggregator(request.GetTopN(), agg, request.GetFieldValueSort()) var tags []string + var responseCount int for _, f := range ff { if m, getErr := f.Get(); getErr != nil { allErr = multierr.Append(allErr, getErr) @@ -126,6 +128,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp if d == nil { continue } + responseCount++ topNResp := d.(*measurev1.TopNResponse) for _, l := range topNResp.Lists { for _, tn := range l.Items { @@ -144,6 +147,9 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp } } } + if span != nil { + span.Tagf("response_count", "%d", responseCount) + } if allErr != nil { resp = bus.NewMessage(now, common.NewError("execute the query %s: %v", request.GetName(), allErr)) return @@ -153,6 +159,9 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp return } lists := aggregator.Val(tags) + if span != nil { + span.Tagf("list_count", "%d", len(lists)) + } resp = bus.NewMessage(now, &measurev1.TopNResponse{ Lists: lists, }) diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index 9cb32f9d..17291452 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -297,15 +297,19 @@ func (ms *measureService) Query(ctx context.Context, req *measurev1.QueryRequest return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", req.GetTimeRange(), err) } now := time.Now() + var tracer *query.Tracer + var span *query.Span + var responseDataPointCount int if req.Trace { - tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano)) - span, _ := tracer.StartSpan(ctx, "measure-grpc") + tracer, _ = query.NewTracer(ctx, now.Format(time.RFC3339Nano)) + span, _ = tracer.StartSpan(ctx, "measure-grpc") span.Tag("request", convert.BytesToString(logger.Proto(req))) defer func() { if err != nil { span.Error(err) span.Stop() } else { + span.Tagf("response_data_point_count", "%d", responseDataPointCount) span.AddSubTrace(resp.Trace) span.Stop() resp.Trace = tracer.ToProto() @@ -326,6 +330,7 @@ func (ms *measureService) Query(ctx context.Context, req *measurev1.QueryRequest data := msg.Data() switch d := data.(type) { case *measurev1.QueryResponse: + responseDataPointCount = len(d.DataPoints) return d, nil case *common.Error: return nil, errors.WithMessage(errQueryMsg, d.Error()) @@ -348,18 +353,22 @@ func (ms *measureService) TopN(ctx context.Context, topNRequest *measurev1.TopNR return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", topNRequest.GetTimeRange(), err) } now := time.Now() + var topNTracer *query.Tracer + var topNSpan *query.Span + var responseListCount int if topNRequest.Trace { - tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano)) - span, _ := tracer.StartSpan(ctx, "topn-grpc") - span.Tag("request", convert.BytesToString(logger.Proto(topNRequest))) + topNTracer, _ = query.NewTracer(ctx, now.Format(time.RFC3339Nano)) + topNSpan, _ = topNTracer.StartSpan(ctx, "topn-grpc") + topNSpan.Tag("request", convert.BytesToString(logger.Proto(topNRequest))) defer func() { if err != nil { - span.Error(err) + topNSpan.Error(err) } else { - span.AddSubTrace(resp.Trace) - resp.Trace = tracer.ToProto() + topNSpan.Tagf("response_list_count", "%d", responseListCount) + topNSpan.AddSubTrace(resp.Trace) + resp.Trace = topNTracer.ToProto() } - span.Stop() + topNSpan.Stop() }() } message := bus.NewMessage(bus.MessageID(now.UnixNano()), topNRequest) @@ -374,6 +383,7 @@ func (ms *measureService) TopN(ctx context.Context, topNRequest *measurev1.TopNR data := msg.Data() switch d := data.(type) { case *measurev1.TopNResponse: + responseListCount = len(d.Lists) return d, nil case *common.Error: return nil, errors.WithMessage(errQueryMsg, d.Error()) diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index c19b6038..d0da8c6f 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -279,15 +279,19 @@ func (s *streamService) Query(ctx context.Context, req *streamv1.QueryRequest) ( return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", req.GetTimeRange(), err) } now := time.Now() + var tracer *query.Tracer + var span *query.Span + var responseElementCount int if req.Trace { - tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano)) - span, _ := tracer.StartSpan(ctx, "stream-grpc") + tracer, _ = query.NewTracer(ctx, now.Format(time.RFC3339Nano)) + span, _ = tracer.StartSpan(ctx, "stream-grpc") span.Tag("request", convert.BytesToString(logger.Proto(req))) defer func() { if err != nil { span.Error(err) span.Stop() } else { + span.Tagf("response_element_count", "%d", responseElementCount) span.AddSubTrace(resp.Trace) span.Stop() resp.Trace = tracer.ToProto() @@ -309,6 +313,7 @@ func (s *streamService) Query(ctx context.Context, req *streamv1.QueryRequest) ( data := msg.Data() switch d := data.(type) { case *streamv1.QueryResponse: + responseElementCount = len(d.Elements) return d, nil case *common.Error: return nil, errors.WithMessage(errQueryMsg, d.Error()) diff --git a/banyand/liaison/grpc/trace.go b/banyand/liaison/grpc/trace.go index 81000319..c4989652 100644 --- a/banyand/liaison/grpc/trace.go +++ b/banyand/liaison/grpc/trace.go @@ -342,15 +342,19 @@ func (s *traceService) Query(ctx context.Context, req *tracev1.QueryRequest) (re return nil, status.Errorf(codes.InvalidArgument, "%v is invalid :%s", req.GetTimeRange(), err) } now := time.Now() + var tracer *query.Tracer + var span *query.Span + var responseTraceCount int if req.Trace { - tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano)) - span, _ := tracer.StartSpan(ctx, "trace-grpc") + tracer, _ = query.NewTracer(ctx, now.Format(time.RFC3339Nano)) + span, _ = tracer.StartSpan(ctx, "trace-grpc") span.Tag("request", convert.BytesToString(logger.Proto(req))) defer func() { if err != nil { span.Error(err) span.Stop() } else if resp != nil && resp != emptyTraceQueryResponse { + span.Tagf("response_trace_count", "%d", responseTraceCount) span.AddSubTrace(resp.TraceQueryResult) span.Stop() resp.TraceQueryResult = tracer.ToProto() @@ -381,6 +385,7 @@ func (s *traceService) Query(ctx context.Context, req *tracev1.QueryRequest) (re } traces = append(traces, trace) } + responseTraceCount = len(traces) return &tracev1.QueryResponse{ Traces: traces, TraceQueryResult: d.TraceQueryResult, diff --git a/banyand/query/processor.go b/banyand/query/processor.go index 65808e25..388ac0b5 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -123,6 +123,7 @@ func (p *streamQueryProcessor) Rev(ctx context.Context, message bus.Message) (re data := resp.Data() switch d := data.(type) { case *streamv1.QueryResponse: + span.Tag("resp_count", fmt.Sprintf("%d", len(d.Elements))) span.Stop() d.Trace = tracer.ToProto() case *common.Error: @@ -272,6 +273,7 @@ func (p *measureQueryProcessor) executeQuery(ctx context.Context, queryCriteria data := resp.Data() switch d := data.(type) { case *measurev1.QueryResponse: + span.Tag("resp_count", fmt.Sprintf("%d", len(d.DataPoints))) d.Trace = tracer.ToProto() span.Stop() case *common.Error: @@ -551,6 +553,7 @@ func (tm *traceMonitor) finishTrace(resp *bus.Message, messageID int64) { data := resp.Data() switch d := data.(type) { case *tracev1.InternalQueryResponse: + tm.span.Tag("resp_count", fmt.Sprintf("%d", len(d.InternalTraces))) tm.span.Stop() d.TraceQueryResult = tm.tracer.ToProto() case *common.Error: diff --git a/pkg/query/logical/measure/measure_plan_distributed.go b/pkg/query/logical/measure/measure_plan_distributed.go index 10d25248..bd3ebfcc 100644 --- a/pkg/query/logical/measure/measure_plan_distributed.go +++ b/pkg/query/logical/measure/measure_plan_distributed.go @@ -262,6 +262,8 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi executor.MIterator, e } var see []sort.Iterator[*comparableDataPoint] var pushedDownAggDps []*measurev1.DataPoint + var responseCount int + var dataPointCount int for _, f := range ff { if m, getErr := f.Get(); getErr != nil { err = multierr.Append(err, getErr) @@ -271,18 +273,25 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi executor.MIterator, e continue } resp := d.(*measurev1.QueryResponse) + responseCount++ if span != nil { span.AddSubTrace(resp.Trace) } if t.needCompletePushDownAgg { pushedDownAggDps = append(pushedDownAggDps, resp.DataPoints...) + dataPointCount += len(resp.DataPoints) continue } + dataPointCount += len(resp.DataPoints) see = append(see, newSortableElements(resp.DataPoints, t.sortByTime, t.sortTagSpec)) } } + if span != nil { + span.Tagf("response_count", "%d", responseCount) + span.Tagf("data_point_count", "%d", dataPointCount) + } if t.needCompletePushDownAgg { return &pushedDownAggregatedIterator{dataPoints: pushedDownAggDps}, err } diff --git a/pkg/query/logical/stream/stream_plan_distributed.go b/pkg/query/logical/stream/stream_plan_distributed.go index 7827f56f..b9937765 100644 --- a/pkg/query/logical/stream/stream_plan_distributed.go +++ b/pkg/query/logical/stream/stream_plan_distributed.go @@ -159,6 +159,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (ee []*streamv1.Element, } var allErr error var see []sort.Iterator[*comparableElement] + var responseCount int for _, f := range ff { if m, getErr := f.Get(); getErr != nil { allErr = multierr.Append(allErr, getErr) @@ -168,6 +169,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (ee []*streamv1.Element, continue } resp := d.(*streamv1.QueryResponse) + responseCount++ if span != nil { span.AddSubTrace(resp.Trace) } @@ -185,6 +187,10 @@ func (t *distributedPlan) Execute(ctx context.Context) (ee []*streamv1.Element, result = append(result, element) } } + if span != nil { + span.Tagf("response_count", "%d", responseCount) + span.Tagf("element_id_count", "%d", len(seen)) + } return result, allErr } diff --git a/pkg/query/logical/trace/trace_plan_distributed.go b/pkg/query/logical/trace/trace_plan_distributed.go index f9e8e960..bedc26ae 100644 --- a/pkg/query/logical/trace/trace_plan_distributed.go +++ b/pkg/query/logical/trace/trace_plan_distributed.go @@ -155,6 +155,7 @@ func (p *distributedPlan) Execute(ctx context.Context) (iter.Iterator[model.Trac return iter.Empty[model.TraceResult](), err } var allErr error + var responseCount int var st []sort.Iterator[*comparableTrace] for _, f := range ff { if m, getErr := f.Get(); getErr != nil { @@ -165,6 +166,7 @@ func (p *distributedPlan) Execute(ctx context.Context) (iter.Iterator[model.Trac continue } resp := d.(*tracev1.InternalQueryResponse) + responseCount++ if span != nil { span.AddSubTrace(resp.TraceQueryResult) } @@ -172,6 +174,9 @@ func (p *distributedPlan) Execute(ctx context.Context) (iter.Iterator[model.Trac newSortableTraces(resp.InternalTraces, p.sortByTraceID)) } } + if span != nil { + span.Tagf("response_count", "%d", responseCount) + } sortIter := sort.NewItemIter(st, p.desc) var result []*tracev1.InternalTrace seen := make(map[string]*tracev1.InternalTrace) @@ -195,6 +200,9 @@ func (p *distributedPlan) Execute(ctx context.Context) (iter.Iterator[model.Trac } } } + if span != nil { + span.Tagf("trace_id_count", "%d", len(seen)) + } return &distributedTraceResultIterator{ traces: result, diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go index fdf141f1..1be7149c 100644 --- a/test/cases/trace/trace.go +++ b/test/cases/trace/trace.go @@ -54,7 +54,7 @@ var _ = g.DescribeTable("Scanning Traces", func(args helpers.Args) { g.Entry("filter by trace id and service unknown", helpers.Args{Input: "eq_trace_id_and_service_unknown", Duration: 1 * time.Hour, WantEmpty: true}), g.Entry("filter by query", helpers.Args{Input: "having_query_tag", Duration: 1 * time.Hour}), g.Entry("err in arr", helpers.Args{Input: "err_in_arr", Duration: 1 * time.Hour, WantErr: true}), - g.Entry("filter by query with having condition", helpers.Args{Input: "having_query_tag_cond", Want: "having_query_tag", Duration: 1 * time.Hour}), + g.FEntry("filter by query with having condition", helpers.Args{Input: "having_query_tag_cond", Want: "having_query_tag", Duration: 1 * time.Hour}), g.Entry("multi-groups: unchanged tags", helpers.Args{Input: "multi_group_unchanged", Duration: 1 * time.Hour}), g.Entry("multi-groups: new tag", helpers.Args{Input: "multi_group_new_tag", Duration: 1 * time.Hour}), g.Entry("multi-groups: tag type change", helpers.Args{Input: "multi_group_tag_type", Duration: 1 * time.Hour}),
