This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch bug/empty-trace
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit c64bd69ab9ef6365921469616becda941b0b3211
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Nov 17 13:48:57 2025 +0000

    Enhance query result tracing by adding metrics recording for cursor and 
result events. Introduce functions to start and finish tracing spans, improving 
observability of query operations.
---
 banyand/trace/query.go   | 29 +++++++++++++++---
 banyand/trace/tracing.go | 79 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 104 insertions(+), 4 deletions(-)

diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index 8aecc80f..5528f8a2 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -101,6 +101,7 @@ func (t *trace) Query(ctx context.Context, tqo 
model.TraceQueryOptions) (model.T
        pipelineCtx, cancel := context.WithTimeout(ctx, queryTimeout)
        result.ctx = pipelineCtx
        result.cancel = cancel
+       result.recordCursor, result.recordResult, result.finishResultSpan = 
startQueryResultSpan(pipelineCtx)
 
        if result.keys == nil {
                result.keys = make(map[string]int64)
@@ -244,17 +245,20 @@ func (t *trace) prepareSIDXStreaming(
 type queryResult struct {
        ctx                 context.Context
        err                 error
-       currentBatch        *scanBatch
-       tagProjection       *model.TagProjection
+       streamDone          <-chan struct{}
+       recordCursor        func(*blockCursor)
        keys                map[string]int64
        cursorBatchCh       <-chan *scanBatch
        cancel              context.CancelFunc
        currentCursorGroups map[string][]*blockCursor
-       streamDone          <-chan struct{}
+       currentBatch        *scanBatch
+       tagProjection       *model.TagProjection
+       finishResultSpan    func(int, error)
+       recordResult        func(*model.TraceResult)
        currentTraceIDs     []string
        segments            []storage.Segment[*tsTable, option]
-       currentIndex        int
        hit                 int
+       currentIndex        int
 }
 
 func (qr *queryResult) Pull() *model.TraceResult {
@@ -313,6 +317,10 @@ func (qr *queryResult) Pull() *model.TraceResult {
                qr.currentIndex++
                delete(qr.currentCursorGroups, traceID)
 
+               if qr.recordResult != nil {
+                       qr.recordResult(result)
+               }
+
                return result
        }
 }
@@ -367,6 +375,9 @@ func (qr *queryResult) ensureCurrentBatch() bool {
                                                return true
                                        }
                                        if result.cursor != nil {
+                                               if qr.recordCursor != nil {
+                                                       
qr.recordCursor(result.cursor)
+                                               }
                                                traceID := 
result.cursor.bm.traceID
                                                qr.currentCursorGroups[traceID] 
= append(qr.currentCursorGroups[traceID], result.cursor)
                                        }
@@ -498,6 +509,16 @@ func (qr *queryResult) Release() {
                qr.segments[i].DecRef()
        }
        qr.segments = qr.segments[:0]
+
+       qr.finishTracing(qr.err)
+}
+
+func (qr *queryResult) finishTracing(err error) {
+       if qr.finishResultSpan == nil {
+               return
+       }
+       qr.finishResultSpan(qr.hit, err)
+       qr.finishResultSpan = nil
 }
 
 func mustDecodeTagValue(valueType pbv1.ValueType, value []byte) 
*modelv1.TagValue {
diff --git a/banyand/trace/tracing.go b/banyand/trace/tracing.go
index f3ed8ac2..69d2a252 100644
--- a/banyand/trace/tracing.go
+++ b/banyand/trace/tracing.go
@@ -28,6 +28,7 @@ import (
        "github.com/dustin/go-humanize"
 
        "github.com/apache/skywalking-banyandb/pkg/query"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
 )
 
 const (
@@ -354,3 +355,81 @@ func startAggregatedBlockScanSpan(ctx context.Context, 
groupedIDs [][]string, pa
                        span.Stop()
                }
 }
+
+// startQueryResultSpan records aggregated metrics for cursor consumption and 
trace results.
+// It returns recorders for cursor/result events and a finish function to 
complete the span.
+func startQueryResultSpan(ctx context.Context) (func(*blockCursor), 
func(*model.TraceResult), func(int, error)) {
+       tracer := query.GetTracer(ctx)
+       if tracer == nil {
+               return nil, nil, nil
+       }
+
+       span, _ := tracer.StartSpan(ctx, "query-result")
+
+       var (
+               cursorCount     int
+               cursorBytes     uint64
+               cursorTraceStat = make(map[string]int)
+               cursorSamples   []string
+
+               resultRecorded  int
+               resultTraceStat = make(map[string]int)
+               resultSamples   []string
+       )
+
+       recordCursor := func(bc *blockCursor) {
+               if bc == nil {
+                       return
+               }
+               cursorCount++
+               cursorBytes += bc.bm.uncompressedSpanSizeBytes
+               cursorTraceStat[bc.bm.traceID]++
+               if len(cursorSamples) < traceIDSampleLimit {
+                       cursorSamples = append(cursorSamples, 
fmt.Sprintf("%s:%d", bc.bm.traceID, bc.bm.count))
+               }
+       }
+
+       recordResult := func(res *model.TraceResult) {
+               if res == nil || res.Error != nil || res.TID == "" {
+                       return
+               }
+               resultRecorded++
+               resultTraceStat[res.TID]++
+               if len(resultSamples) < traceIDSampleLimit {
+                       resultSamples = append(resultSamples, 
fmt.Sprintf("%s:%d", res.TID, len(res.Spans)))
+               }
+       }
+
+       finish := func(hit int, err error) {
+               span.Tag("cursor_total", strconv.Itoa(cursorCount))
+               if cursorBytes > 0 {
+                       span.Tag("cursor_bytes", humanize.Bytes(cursorBytes))
+               }
+               span.Tag("cursor_trace_total", 
strconv.Itoa(len(cursorTraceStat)))
+               if len(cursorSamples) > 0 {
+                       span.Tag("cursor_sample", strings.Join(cursorSamples, 
","))
+               }
+
+               span.Tag("result_recorded", strconv.Itoa(resultRecorded))
+               span.Tag("result_trace_total", 
strconv.Itoa(len(resultTraceStat)))
+               if len(resultSamples) > 0 {
+                       span.Tag("result_sample", strings.Join(resultSamples, 
","))
+               }
+
+               span.Tag("result_hit_count", strconv.Itoa(hit))
+               if resultRecorded != hit {
+                       span.Tag("result_hit_mismatch", 
strconv.Itoa(resultRecorded-hit))
+               }
+               traceGap := len(cursorTraceStat) - len(resultTraceStat)
+               if traceGap > 0 {
+                       span.Tag("result_trace_gap", strconv.Itoa(traceGap))
+               }
+
+               if err != nil {
+                       span.Error(err)
+               }
+               span.Stop()
+       }
+
+       return recordCursor, recordResult, finish
+}

Reply via email to