This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch trace/query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 6ec1f8b0393fb67a923e169a0cff41d954f28b0a Author: Gao Hongtao <[email protected]> AuthorDate: Fri Aug 29 11:57:53 2025 +0800 Refactor trace query execution logic to improve error handling and response formatting. Enhance the trace query processor with panic recovery and detailed logging. Update trace schema and analyzer interfaces for better compatibility and clarity. --- banyand/query/processor.go | 94 +++++++++++++++++++++--- banyand/trace/trace.go | 3 + pkg/query/logical/trace/schema.go | 2 +- pkg/query/logical/trace/trace_analyzer.go | 6 +- pkg/query/logical/trace/trace_plan_tag_filter.go | 13 ---- 5 files changed, 89 insertions(+), 29 deletions(-) diff --git a/banyand/query/processor.go b/banyand/query/processor.go index b7d855ef..347b998b 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -43,7 +43,6 @@ import ( logical_measure "github.com/apache/skywalking-banyandb/pkg/query/logical/measure" logical_stream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream" logical_trace "github.com/apache/skywalking-banyandb/pkg/query/logical/trace" - "github.com/apache/skywalking-banyandb/pkg/run" ) const ( @@ -51,11 +50,10 @@ const ( ) var ( - _ run.PreRunner = (*queryService)(nil) - _ bus.MessageListener = (*streamQueryProcessor)(nil) - _ bus.MessageListener = (*measureQueryProcessor)(nil) - _ bus.MessageListener = (*topNQueryProcessor)(nil) - _ bus.MessageListener = (*traceQueryProcessor)(nil) + _ bus.MessageListener = (*streamQueryProcessor)(nil) + _ bus.MessageListener = (*measureQueryProcessor)(nil) + _ bus.MessageListener = (*traceQueryProcessor)(nil) + _ executor.TraceExecutionContext = trace.Trace(nil) ) type streamQueryProcessor struct { @@ -453,10 +451,19 @@ func (p *traceQueryProcessor) Rev(ctx context.Context, message bus.Message) (res return } -func (p *traceQueryProcessor) executeQuery(_ context.Context, queryCriteria *tracev1.QueryRequest) (resp bus.Message) { +func (p *traceQueryProcessor) executeQuery(ctx context.Context, queryCriteria *tracev1.QueryRequest) (resp bus.Message) { n := time.Now() now := n.UnixNano() + defer func() { + if err := recover(); err != nil { + p.log.Error().Interface("err", err).RawJSON("req", logger.Proto(queryCriteria)).Str("stack", string(debug.Stack())).Msg("panic") + resp = bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic")) + } + }() + + var metadata []*commonv1.Metadata var schemas []logical.Schema + var ecc []executor.TraceExecutionContext for i := range queryCriteria.Groups { meta := &commonv1.Metadata{ Name: queryCriteria.Name, @@ -467,22 +474,85 @@ func (p *traceQueryProcessor) executeQuery(_ context.Context, queryCriteria *tra resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to get execution context for trace %s: %v", meta.GetName(), err)) return } + ecc = append(ecc, ec) s, err := logical_trace.BuildSchema(ec.GetSchema(), ec.GetIndexRules()) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to build schema for trace %s: %v", meta.GetName(), err)) return } schemas = append(schemas, s) + metadata = append(metadata, meta) } - // For now, we only implement BuildSchema - the query execution stops here - // This is exactly what was requested: "executeQuery stop at the schema parsing since we only implement the BuildSchema right now" + plan, err := logical_trace.Analyze(queryCriteria, metadata, schemas, ecc) + if err != nil { + resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for trace %s: %v", queryCriteria.GetName(), err)) + return + } if p.log.Debug().Enabled() { - p.log.Debug().Int("schemas_built", len(schemas)).Msg("trace schemas built successfully, execution stopped at schema parsing") + p.log.Debug().Str("plan", plan.String()).Msg("trace query plan") + } + + var tracer *query.Tracer + var span *query.Span + if queryCriteria.Trace { + tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano)) + span, ctx = tracer.StartSpan(ctx, "data-%s", p.queryService.nodeID) + span.Tag("plan", plan.String()) + defer func() { + data := resp.Data() + switch d := data.(type) { + case *tracev1.QueryResponse: + d.TraceQueryResult = tracer.ToProto() + case *common.Error: + span.Error(errors.New(d.Error())) + resp = bus.NewMessage(bus.MessageID(now), &tracev1.QueryResponse{TraceQueryResult: tracer.ToProto()}) + default: + panic("unexpected data type") + } + span.Stop() + }() + } + + te := plan.(executor.TraceExecutable) + defer te.Close() + result, err := te.Execute(ctx) + if err != nil { + p.log.Error().Err(err).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to execute the trace query plan") + resp = bus.NewMessage(bus.MessageID(now), common.NewError("execute the query plan for trace %s: %v", queryCriteria.GetName(), err)) + return + } + + // Convert model.TraceResult to tracev1.QueryResponse format + spans := make([]*tracev1.Span, len(result.Spans)) + for i, spanBytes := range result.Spans { + // Create trace tags from the result + var traceTags []*modelv1.Tag + if result.Tags != nil { + for _, tag := range result.Tags { + if len(tag.Values) > 0 { + traceTags = append(traceTags, &modelv1.Tag{ + Key: tag.Name, + Value: tag.Values[0], + }) + } + } + } + + spans[i] = &tracev1.Span{ + Tags: traceTags, + Span: spanBytes, + } } - // Return an empty response indicating successful schema building but no actual execution - resp = bus.NewMessage(bus.MessageID(now), &tracev1.QueryResponse{Spans: make([]*tracev1.Span, 0)}) + resp = bus.NewMessage(bus.MessageID(now), &tracev1.QueryResponse{Spans: spans}) + + if !queryCriteria.Trace && p.slowQuery > 0 { + latency := time.Since(n) + if latency > p.slowQuery { + p.log.Warn().Dur("latency", latency).RawJSON("req", logger.Proto(queryCriteria)).Int("resp_count", len(spans)).Msg("trace slow query") + } + } return } diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go index ed33c4f0..df1c57ff 100644 --- a/banyand/trace/trace.go +++ b/banyand/trace/trace.go @@ -22,6 +22,7 @@ package trace import ( + "context" "sync/atomic" "time" @@ -31,6 +32,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/schema" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -74,6 +76,7 @@ type Query interface { type Trace interface { GetSchema() *databasev1.Trace GetIndexRules() []*databasev1.IndexRule + Query(ctx context.Context, opts model.TraceQueryOptions) (model.TraceQueryResult, error) } type indexSchema struct { diff --git a/pkg/query/logical/trace/schema.go b/pkg/query/logical/trace/schema.go index c27bc846..04c39dc8 100644 --- a/pkg/query/logical/trace/schema.go +++ b/pkg/query/logical/trace/schema.go @@ -74,7 +74,7 @@ func (s *schema) CreateFieldRef(_ ...*logical.Field) ([]*logical.FieldRef, error panic("no field for trace") } -func (s *schema) IndexRuleDefined(indexRuleName string) (bool, *databasev1.IndexRule) { +func (s *schema) IndexRuleDefined(_ string) (bool, *databasev1.IndexRule) { panic("trace does not support index rule") } diff --git a/pkg/query/logical/trace/trace_analyzer.go b/pkg/query/logical/trace/trace_analyzer.go index ee5bff71..0cbdbef6 100644 --- a/pkg/query/logical/trace/trace_analyzer.go +++ b/pkg/query/logical/trace/trace_analyzer.go @@ -230,7 +230,7 @@ type unresolvedTraceMerger struct { tagProjection [][]*logical.Tag } -func (utm *unresolvedTraceMerger) Analyze(s logical.Schema) (logical.Plan, error) { +func (utm *unresolvedTraceMerger) Analyze(_ logical.Schema) (logical.Plan, error) { // TODO: Implement trace merger logic return nil, fmt.Errorf("trace merger not implemented yet") } @@ -246,7 +246,7 @@ type unresolvedTraceDistributed struct { criteria *tracev1.QueryRequest } -func (utd *unresolvedTraceDistributed) Analyze(s logical.Schema) (logical.Plan, error) { +func (utd *unresolvedTraceDistributed) Analyze(_ logical.Schema) (logical.Plan, error) { // TODO: Implement distributed trace analysis return nil, fmt.Errorf("distributed trace query not implemented yet") } @@ -268,7 +268,7 @@ type distributedTraceLimit struct { limitNum uint32 } -func (dtl *distributedTraceLimit) Analyze(s logical.Schema) (logical.Plan, error) { +func (dtl *distributedTraceLimit) Analyze(_ logical.Schema) (logical.Plan, error) { // TODO: Implement distributed trace limit analysis return nil, fmt.Errorf("distributed trace limit not implemented yet") } diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go b/pkg/query/logical/trace/trace_plan_tag_filter.go index 7107cb8e..4c77d184 100644 --- a/pkg/query/logical/trace/trace_plan_tag_filter.go +++ b/pkg/query/logical/trace/trace_plan_tag_filter.go @@ -101,19 +101,6 @@ func (uis *unresolvedTraceTagFilter) selectTraceScanner(ctx *traceAnalyzeContext } } -func traceTagFilter(startTime, endTime time.Time, metadata *commonv1.Metadata, criteria *modelv1.Criteria, - projection [][]*logical.Tag, ec executor.TraceExecutionContext, -) logical.UnresolvedPlan { - return &unresolvedTraceTagFilter{ - startTime: startTime, - endTime: endTime, - metadata: metadata, - criteria: criteria, - projectionTags: projection, - ec: ec, - } -} - type traceAnalyzeContext struct { s logical.Schema skippingFilter index.Filter
