This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch bug/empty-trace
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 0e638635749b5da5432871ecc59beeb34590d51f
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Nov 18 11:38:38 2025 +0000

    Enhance tracing in query processing by adding response count metrics across 
various services, improving observability of query results and performance 
analysis.
---
 banyand/dquery/topn.go                             | 11 ++++++++-
 banyand/liaison/grpc/measure.go                    | 28 +++++++++++++++-------
 banyand/liaison/grpc/stream.go                     |  9 +++++--
 banyand/liaison/grpc/trace.go                      |  9 +++++--
 banyand/query/processor.go                         |  3 +++
 .../logical/measure/measure_plan_distributed.go    |  9 +++++++
 .../logical/stream/stream_plan_distributed.go      |  6 +++++
 pkg/query/logical/trace/trace_plan_distributed.go  |  8 +++++++
 test/cases/trace/trace.go                          |  2 +-
 9 files changed, 70 insertions(+), 15 deletions(-)

diff --git a/banyand/dquery/topn.go b/banyand/dquery/topn.go
index 80f59d34..d96b78c2 100644
--- a/banyand/dquery/topn.go
+++ b/banyand/dquery/topn.go
@@ -87,10 +87,11 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
                resp = bus.NewMessage(now, common.NewError("no stage found"))
                return
        }
+       var span *pkgquery.Span
        if request.Trace {
                var tracer *pkgquery.Tracer
                tracer, ctx = pkgquery.NewTracer(ctx, 
n.Format(time.RFC3339Nano))
-               span, _ := tracer.StartSpan(ctx, "distributed-client")
+               span, _ = tracer.StartSpan(ctx, "distributed-client")
                span.Tag("request", 
convert.BytesToString(logger.Proto(request)))
                span.Tagf("nodeSelectors", "%v", nodeSelectors)
                defer func() {
@@ -118,6 +119,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
        aggregator := query.CreateTopNPostAggregator(request.GetTopN(),
                agg, request.GetFieldValueSort())
        var tags []string
+       var responseCount int
        for _, f := range ff {
                if m, getErr := f.Get(); getErr != nil {
                        allErr = multierr.Append(allErr, getErr)
@@ -126,6 +128,7 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
                        if d == nil {
                                continue
                        }
+                       responseCount++
                        topNResp := d.(*measurev1.TopNResponse)
                        for _, l := range topNResp.Lists {
                                for _, tn := range l.Items {
@@ -144,6 +147,9 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
                        }
                }
        }
+       if span != nil {
+               span.Tagf("response_count", "%d", responseCount)
+       }
        if allErr != nil {
                resp = bus.NewMessage(now, common.NewError("execute the query 
%s: %v", request.GetName(), allErr))
                return
@@ -153,6 +159,9 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
                return
        }
        lists := aggregator.Val(tags)
+       if span != nil {
+               span.Tagf("list_count", "%d", len(lists))
+       }
        resp = bus.NewMessage(now, &measurev1.TopNResponse{
                Lists: lists,
        })
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 9cb32f9d..17291452 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -297,15 +297,19 @@ func (ms *measureService) Query(ctx context.Context, req 
*measurev1.QueryRequest
                return nil, status.Errorf(codes.InvalidArgument, "%v is invalid 
:%s", req.GetTimeRange(), err)
        }
        now := time.Now()
+       var tracer *query.Tracer
+       var span *query.Span
+       var responseDataPointCount int
        if req.Trace {
-               tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
-               span, _ := tracer.StartSpan(ctx, "measure-grpc")
+               tracer, _ = query.NewTracer(ctx, now.Format(time.RFC3339Nano))
+               span, _ = tracer.StartSpan(ctx, "measure-grpc")
                span.Tag("request", convert.BytesToString(logger.Proto(req)))
                defer func() {
                        if err != nil {
                                span.Error(err)
                                span.Stop()
                        } else {
+                               span.Tagf("response_data_point_count", "%d", 
responseDataPointCount)
                                span.AddSubTrace(resp.Trace)
                                span.Stop()
                                resp.Trace = tracer.ToProto()
@@ -326,6 +330,7 @@ func (ms *measureService) Query(ctx context.Context, req 
*measurev1.QueryRequest
        data := msg.Data()
        switch d := data.(type) {
        case *measurev1.QueryResponse:
+               responseDataPointCount = len(d.DataPoints)
                return d, nil
        case *common.Error:
                return nil, errors.WithMessage(errQueryMsg, d.Error())
@@ -348,18 +353,22 @@ func (ms *measureService) TopN(ctx context.Context, 
topNRequest *measurev1.TopNR
                return nil, status.Errorf(codes.InvalidArgument, "%v is invalid 
:%s", topNRequest.GetTimeRange(), err)
        }
        now := time.Now()
+       var topNTracer *query.Tracer
+       var topNSpan *query.Span
+       var responseListCount int
        if topNRequest.Trace {
-               tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
-               span, _ := tracer.StartSpan(ctx, "topn-grpc")
-               span.Tag("request", 
convert.BytesToString(logger.Proto(topNRequest)))
+               topNTracer, _ = query.NewTracer(ctx, 
now.Format(time.RFC3339Nano))
+               topNSpan, _ = topNTracer.StartSpan(ctx, "topn-grpc")
+               topNSpan.Tag("request", 
convert.BytesToString(logger.Proto(topNRequest)))
                defer func() {
                        if err != nil {
-                               span.Error(err)
+                               topNSpan.Error(err)
                        } else {
-                               span.AddSubTrace(resp.Trace)
-                               resp.Trace = tracer.ToProto()
+                               topNSpan.Tagf("response_list_count", "%d", 
responseListCount)
+                               topNSpan.AddSubTrace(resp.Trace)
+                               resp.Trace = topNTracer.ToProto()
                        }
-                       span.Stop()
+                       topNSpan.Stop()
                }()
        }
        message := bus.NewMessage(bus.MessageID(now.UnixNano()), topNRequest)
@@ -374,6 +383,7 @@ func (ms *measureService) TopN(ctx context.Context, 
topNRequest *measurev1.TopNR
        data := msg.Data()
        switch d := data.(type) {
        case *measurev1.TopNResponse:
+               responseListCount = len(d.Lists)
                return d, nil
        case *common.Error:
                return nil, errors.WithMessage(errQueryMsg, d.Error())
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index c19b6038..d0da8c6f 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -279,15 +279,19 @@ func (s *streamService) Query(ctx context.Context, req 
*streamv1.QueryRequest) (
                return nil, status.Errorf(codes.InvalidArgument, "%v is invalid 
:%s", req.GetTimeRange(), err)
        }
        now := time.Now()
+       var tracer *query.Tracer
+       var span *query.Span
+       var responseElementCount int
        if req.Trace {
-               tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
-               span, _ := tracer.StartSpan(ctx, "stream-grpc")
+               tracer, _ = query.NewTracer(ctx, now.Format(time.RFC3339Nano))
+               span, _ = tracer.StartSpan(ctx, "stream-grpc")
                span.Tag("request", convert.BytesToString(logger.Proto(req)))
                defer func() {
                        if err != nil {
                                span.Error(err)
                                span.Stop()
                        } else {
+                               span.Tagf("response_element_count", "%d", 
responseElementCount)
                                span.AddSubTrace(resp.Trace)
                                span.Stop()
                                resp.Trace = tracer.ToProto()
@@ -309,6 +313,7 @@ func (s *streamService) Query(ctx context.Context, req 
*streamv1.QueryRequest) (
        data := msg.Data()
        switch d := data.(type) {
        case *streamv1.QueryResponse:
+               responseElementCount = len(d.Elements)
                return d, nil
        case *common.Error:
                return nil, errors.WithMessage(errQueryMsg, d.Error())
diff --git a/banyand/liaison/grpc/trace.go b/banyand/liaison/grpc/trace.go
index 81000319..c4989652 100644
--- a/banyand/liaison/grpc/trace.go
+++ b/banyand/liaison/grpc/trace.go
@@ -342,15 +342,19 @@ func (s *traceService) Query(ctx context.Context, req 
*tracev1.QueryRequest) (re
                return nil, status.Errorf(codes.InvalidArgument, "%v is invalid 
:%s", req.GetTimeRange(), err)
        }
        now := time.Now()
+       var tracer *query.Tracer
+       var span *query.Span
+       var responseTraceCount int
        if req.Trace {
-               tracer, _ := query.NewTracer(ctx, now.Format(time.RFC3339Nano))
-               span, _ := tracer.StartSpan(ctx, "trace-grpc")
+               tracer, _ = query.NewTracer(ctx, now.Format(time.RFC3339Nano))
+               span, _ = tracer.StartSpan(ctx, "trace-grpc")
                span.Tag("request", convert.BytesToString(logger.Proto(req)))
                defer func() {
                        if err != nil {
                                span.Error(err)
                                span.Stop()
                        } else if resp != nil && resp != 
emptyTraceQueryResponse {
+                               span.Tagf("response_trace_count", "%d", 
responseTraceCount)
                                span.AddSubTrace(resp.TraceQueryResult)
                                span.Stop()
                                resp.TraceQueryResult = tracer.ToProto()
@@ -381,6 +385,7 @@ func (s *traceService) Query(ctx context.Context, req 
*tracev1.QueryRequest) (re
                        }
                        traces = append(traces, trace)
                }
+               responseTraceCount = len(traces)
                return &tracev1.QueryResponse{
                        Traces:           traces,
                        TraceQueryResult: d.TraceQueryResult,
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 65808e25..388ac0b5 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -123,6 +123,7 @@ func (p *streamQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (re
                        data := resp.Data()
                        switch d := data.(type) {
                        case *streamv1.QueryResponse:
+                               span.Tag("resp_count", fmt.Sprintf("%d", 
len(d.Elements)))
                                span.Stop()
                                d.Trace = tracer.ToProto()
                        case *common.Error:
@@ -272,6 +273,7 @@ func (p *measureQueryProcessor) executeQuery(ctx 
context.Context, queryCriteria
                        data := resp.Data()
                        switch d := data.(type) {
                        case *measurev1.QueryResponse:
+                               span.Tag("resp_count", fmt.Sprintf("%d", 
len(d.DataPoints)))
                                d.Trace = tracer.ToProto()
                                span.Stop()
                        case *common.Error:
@@ -551,6 +553,7 @@ func (tm *traceMonitor) finishTrace(resp *bus.Message, 
messageID int64) {
        data := resp.Data()
        switch d := data.(type) {
        case *tracev1.InternalQueryResponse:
+               tm.span.Tag("resp_count", fmt.Sprintf("%d", 
len(d.InternalTraces)))
                tm.span.Stop()
                d.TraceQueryResult = tm.tracer.ToProto()
        case *common.Error:
diff --git a/pkg/query/logical/measure/measure_plan_distributed.go 
b/pkg/query/logical/measure/measure_plan_distributed.go
index 10d25248..bd3ebfcc 100644
--- a/pkg/query/logical/measure/measure_plan_distributed.go
+++ b/pkg/query/logical/measure/measure_plan_distributed.go
@@ -262,6 +262,8 @@ func (t *distributedPlan) Execute(ctx context.Context) (mi 
executor.MIterator, e
        }
        var see []sort.Iterator[*comparableDataPoint]
        var pushedDownAggDps []*measurev1.DataPoint
+       var responseCount int
+       var dataPointCount int
        for _, f := range ff {
                if m, getErr := f.Get(); getErr != nil {
                        err = multierr.Append(err, getErr)
@@ -271,18 +273,25 @@ func (t *distributedPlan) Execute(ctx context.Context) 
(mi executor.MIterator, e
                                continue
                        }
                        resp := d.(*measurev1.QueryResponse)
+                       responseCount++
                        if span != nil {
                                span.AddSubTrace(resp.Trace)
                        }
                        if t.needCompletePushDownAgg {
                                pushedDownAggDps = append(pushedDownAggDps, 
resp.DataPoints...)
+                               dataPointCount += len(resp.DataPoints)
                                continue
                        }
+                       dataPointCount += len(resp.DataPoints)
                        see = append(see,
                                newSortableElements(resp.DataPoints,
                                        t.sortByTime, t.sortTagSpec))
                }
        }
+       if span != nil {
+               span.Tagf("response_count", "%d", responseCount)
+               span.Tagf("data_point_count", "%d", dataPointCount)
+       }
        if t.needCompletePushDownAgg {
                return &pushedDownAggregatedIterator{dataPoints: 
pushedDownAggDps}, err
        }
diff --git a/pkg/query/logical/stream/stream_plan_distributed.go 
b/pkg/query/logical/stream/stream_plan_distributed.go
index 7827f56f..b9937765 100644
--- a/pkg/query/logical/stream/stream_plan_distributed.go
+++ b/pkg/query/logical/stream/stream_plan_distributed.go
@@ -159,6 +159,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (ee 
[]*streamv1.Element,
        }
        var allErr error
        var see []sort.Iterator[*comparableElement]
+       var responseCount int
        for _, f := range ff {
                if m, getErr := f.Get(); getErr != nil {
                        allErr = multierr.Append(allErr, getErr)
@@ -168,6 +169,7 @@ func (t *distributedPlan) Execute(ctx context.Context) (ee 
[]*streamv1.Element,
                                continue
                        }
                        resp := d.(*streamv1.QueryResponse)
+                       responseCount++
                        if span != nil {
                                span.AddSubTrace(resp.Trace)
                        }
@@ -185,6 +187,10 @@ func (t *distributedPlan) Execute(ctx context.Context) (ee 
[]*streamv1.Element,
                        result = append(result, element)
                }
        }
+       if span != nil {
+               span.Tagf("response_count", "%d", responseCount)
+               span.Tagf("element_id_count", "%d", len(seen))
+       }
 
        return result, allErr
 }
diff --git a/pkg/query/logical/trace/trace_plan_distributed.go 
b/pkg/query/logical/trace/trace_plan_distributed.go
index f9e8e960..bedc26ae 100644
--- a/pkg/query/logical/trace/trace_plan_distributed.go
+++ b/pkg/query/logical/trace/trace_plan_distributed.go
@@ -155,6 +155,7 @@ func (p *distributedPlan) Execute(ctx context.Context) 
(iter.Iterator[model.Trac
                return iter.Empty[model.TraceResult](), err
        }
        var allErr error
+       var responseCount int
        var st []sort.Iterator[*comparableTrace]
        for _, f := range ff {
                if m, getErr := f.Get(); getErr != nil {
@@ -165,6 +166,7 @@ func (p *distributedPlan) Execute(ctx context.Context) 
(iter.Iterator[model.Trac
                                continue
                        }
                        resp := d.(*tracev1.InternalQueryResponse)
+                       responseCount++
                        if span != nil {
                                span.AddSubTrace(resp.TraceQueryResult)
                        }
@@ -172,6 +174,9 @@ func (p *distributedPlan) Execute(ctx context.Context) 
(iter.Iterator[model.Trac
                                newSortableTraces(resp.InternalTraces, 
p.sortByTraceID))
                }
        }
+       if span != nil {
+               span.Tagf("response_count", "%d", responseCount)
+       }
        sortIter := sort.NewItemIter(st, p.desc)
        var result []*tracev1.InternalTrace
        seen := make(map[string]*tracev1.InternalTrace)
@@ -195,6 +200,9 @@ func (p *distributedPlan) Execute(ctx context.Context) 
(iter.Iterator[model.Trac
                        }
                }
        }
+       if span != nil {
+               span.Tagf("trace_id_count", "%d", len(seen))
+       }
 
        return &distributedTraceResultIterator{
                traces: result,
diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go
index fdf141f1..1be7149c 100644
--- a/test/cases/trace/trace.go
+++ b/test/cases/trace/trace.go
@@ -54,7 +54,7 @@ var _ = g.DescribeTable("Scanning Traces", func(args 
helpers.Args) {
        g.Entry("filter by trace id and service unknown", helpers.Args{Input: 
"eq_trace_id_and_service_unknown", Duration: 1 * time.Hour, WantEmpty: true}),
        g.Entry("filter by query", helpers.Args{Input: "having_query_tag", 
Duration: 1 * time.Hour}),
        g.Entry("err in arr", helpers.Args{Input: "err_in_arr", Duration: 1 * 
time.Hour, WantErr: true}),
-       g.Entry("filter by query with having condition", helpers.Args{Input: 
"having_query_tag_cond", Want: "having_query_tag", Duration: 1 * time.Hour}),
+       g.FEntry("filter by query with having condition", helpers.Args{Input: 
"having_query_tag_cond", Want: "having_query_tag", Duration: 1 * time.Hour}),
        g.Entry("multi-groups: unchanged tags", helpers.Args{Input: 
"multi_group_unchanged", Duration: 1 * time.Hour}),
        g.Entry("multi-groups: new tag", helpers.Args{Input: 
"multi_group_new_tag", Duration: 1 * time.Hour}),
        g.Entry("multi-groups: tag type change", helpers.Args{Input: 
"multi_group_tag_type", Duration: 1 * time.Hour}),

Reply via email to