This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch bug/empty-trace in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c64bd69ab9ef6365921469616becda941b0b3211 Author: Gao Hongtao <[email protected]> AuthorDate: Mon Nov 17 13:48:57 2025 +0000 Enhance query result tracing by adding metrics recording for cursor and result events. Introduce functions to start and finish tracing spans, improving observability of query operations. --- banyand/trace/query.go | 29 +++++++++++++++--- banyand/trace/tracing.go | 79 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 104 insertions(+), 4 deletions(-) diff --git a/banyand/trace/query.go b/banyand/trace/query.go index 8aecc80f..5528f8a2 100644 --- a/banyand/trace/query.go +++ b/banyand/trace/query.go @@ -101,6 +101,7 @@ func (t *trace) Query(ctx context.Context, tqo model.TraceQueryOptions) (model.T pipelineCtx, cancel := context.WithTimeout(ctx, queryTimeout) result.ctx = pipelineCtx result.cancel = cancel + result.recordCursor, result.recordResult, result.finishResultSpan = startQueryResultSpan(pipelineCtx) if result.keys == nil { result.keys = make(map[string]int64) @@ -244,17 +245,20 @@ func (t *trace) prepareSIDXStreaming( type queryResult struct { ctx context.Context err error - currentBatch *scanBatch - tagProjection *model.TagProjection + streamDone <-chan struct{} + recordCursor func(*blockCursor) keys map[string]int64 cursorBatchCh <-chan *scanBatch cancel context.CancelFunc currentCursorGroups map[string][]*blockCursor - streamDone <-chan struct{} + currentBatch *scanBatch + tagProjection *model.TagProjection + finishResultSpan func(int, error) + recordResult func(*model.TraceResult) currentTraceIDs []string segments []storage.Segment[*tsTable, option] - currentIndex int hit int + currentIndex int } func (qr *queryResult) Pull() *model.TraceResult { @@ -313,6 +317,10 @@ func (qr *queryResult) Pull() *model.TraceResult { qr.currentIndex++ delete(qr.currentCursorGroups, traceID) + if qr.recordResult != nil { + qr.recordResult(result) + } + return result } } @@ -367,6 +375,9 @@ func (qr *queryResult) ensureCurrentBatch() bool { return true } if result.cursor != nil { + if qr.recordCursor != nil { + qr.recordCursor(result.cursor) + } traceID := result.cursor.bm.traceID qr.currentCursorGroups[traceID] = append(qr.currentCursorGroups[traceID], result.cursor) } @@ -498,6 +509,16 @@ func (qr *queryResult) Release() { qr.segments[i].DecRef() } qr.segments = qr.segments[:0] + + qr.finishTracing(qr.err) +} + +func (qr *queryResult) finishTracing(err error) { + if qr.finishResultSpan == nil { + return + } + qr.finishResultSpan(qr.hit, err) + qr.finishResultSpan = nil } func mustDecodeTagValue(valueType pbv1.ValueType, value []byte) *modelv1.TagValue { diff --git a/banyand/trace/tracing.go b/banyand/trace/tracing.go index f3ed8ac2..69d2a252 100644 --- a/banyand/trace/tracing.go +++ b/banyand/trace/tracing.go @@ -28,6 +28,7 @@ import ( "github.com/dustin/go-humanize" "github.com/apache/skywalking-banyandb/pkg/query" + "github.com/apache/skywalking-banyandb/pkg/query/model" ) const ( @@ -354,3 +355,81 @@ func startAggregatedBlockScanSpan(ctx context.Context, groupedIDs [][]string, pa span.Stop() } } + +// startQueryResultSpan records aggregated metrics for cursor consumption and trace results. +// It returns recorders for cursor/result events and a finish function to complete the span. +func startQueryResultSpan(ctx context.Context) (func(*blockCursor), func(*model.TraceResult), func(int, error)) { + tracer := query.GetTracer(ctx) + if tracer == nil { + return nil, nil, nil + } + + span, _ := tracer.StartSpan(ctx, "query-result") + + var ( + cursorCount int + cursorBytes uint64 + cursorTraceStat = make(map[string]int) + cursorSamples []string + + resultRecorded int + resultTraceStat = make(map[string]int) + resultSamples []string + ) + + recordCursor := func(bc *blockCursor) { + if bc == nil { + return + } + cursorCount++ + cursorBytes += bc.bm.uncompressedSpanSizeBytes + cursorTraceStat[bc.bm.traceID]++ + if len(cursorSamples) < traceIDSampleLimit { + cursorSamples = append(cursorSamples, fmt.Sprintf("%s:%d", bc.bm.traceID, bc.bm.count)) + } + } + + recordResult := func(res *model.TraceResult) { + if res == nil || res.Error != nil || res.TID == "" { + return + } + resultRecorded++ + resultTraceStat[res.TID]++ + if len(resultSamples) < traceIDSampleLimit { + resultSamples = append(resultSamples, fmt.Sprintf("%s:%d", res.TID, len(res.Spans))) + } + } + + finish := func(hit int, err error) { + span.Tag("cursor_total", strconv.Itoa(cursorCount)) + if cursorBytes > 0 { + span.Tag("cursor_bytes", humanize.Bytes(cursorBytes)) + } + span.Tag("cursor_trace_total", strconv.Itoa(len(cursorTraceStat))) + if len(cursorSamples) > 0 { + span.Tag("cursor_sample", strings.Join(cursorSamples, ",")) + } + + span.Tag("result_recorded", strconv.Itoa(resultRecorded)) + span.Tag("result_trace_total", strconv.Itoa(len(resultTraceStat))) + if len(resultSamples) > 0 { + span.Tag("result_sample", strings.Join(resultSamples, ",")) + } + + span.Tag("result_hit_count", strconv.Itoa(hit)) + if resultRecorded != hit { + span.Tag("result_hit_mismatch", strconv.Itoa(resultRecorded-hit)) + } + traceGap := len(cursorTraceStat) - len(resultTraceStat) + if traceGap > 0 { + span.Tag("result_trace_gap", strconv.Itoa(traceGap)) + } + + if err != nil { + span.Error(err) + } + span.Stop() + } + + return recordCursor, recordResult, finish +}
