This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch trace/query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 6ec1f8b0393fb67a923e169a0cff41d954f28b0a
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Aug 29 11:57:53 2025 +0800

    Refactor trace query execution logic to improve error handling and response 
formatting. Enhance the trace query processor with panic recovery and detailed 
logging. Update trace schema and analyzer interfaces for better compatibility 
and clarity.
---
 banyand/query/processor.go                       | 94 +++++++++++++++++++++---
 banyand/trace/trace.go                           |  3 +
 pkg/query/logical/trace/schema.go                |  2 +-
 pkg/query/logical/trace/trace_analyzer.go        |  6 +-
 pkg/query/logical/trace/trace_plan_tag_filter.go | 13 ----
 5 files changed, 89 insertions(+), 29 deletions(-)

diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index b7d855ef..347b998b 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -43,7 +43,6 @@ import (
        logical_measure 
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
        logical_stream 
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
        logical_trace 
"github.com/apache/skywalking-banyandb/pkg/query/logical/trace"
-       "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 const (
@@ -51,11 +50,10 @@ const (
 )
 
 var (
-       _ run.PreRunner       = (*queryService)(nil)
-       _ bus.MessageListener = (*streamQueryProcessor)(nil)
-       _ bus.MessageListener = (*measureQueryProcessor)(nil)
-       _ bus.MessageListener = (*topNQueryProcessor)(nil)
-       _ bus.MessageListener = (*traceQueryProcessor)(nil)
+       _ bus.MessageListener            = (*streamQueryProcessor)(nil)
+       _ bus.MessageListener            = (*measureQueryProcessor)(nil)
+       _ bus.MessageListener            = (*traceQueryProcessor)(nil)
+       _ executor.TraceExecutionContext = trace.Trace(nil)
 )
 
 type streamQueryProcessor struct {
@@ -453,10 +451,19 @@ func (p *traceQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (res
        return
 }
 
-func (p *traceQueryProcessor) executeQuery(_ context.Context, queryCriteria 
*tracev1.QueryRequest) (resp bus.Message) {
+func (p *traceQueryProcessor) executeQuery(ctx context.Context, queryCriteria 
*tracev1.QueryRequest) (resp bus.Message) {
        n := time.Now()
        now := n.UnixNano()
+       defer func() {
+               if err := recover(); err != nil {
+                       p.log.Error().Interface("err", err).RawJSON("req", 
logger.Proto(queryCriteria)).Str("stack", string(debug.Stack())).Msg("panic")
+                       resp = 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic"))
+               }
+       }()
+
+       var metadata []*commonv1.Metadata
        var schemas []logical.Schema
+       var ecc []executor.TraceExecutionContext
        for i := range queryCriteria.Groups {
                meta := &commonv1.Metadata{
                        Name:  queryCriteria.Name,
@@ -467,22 +474,85 @@ func (p *traceQueryProcessor) executeQuery(_ 
context.Context, queryCriteria *tra
                        resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("fail to get execution context for trace %s: %v", 
meta.GetName(), err))
                        return
                }
+               ecc = append(ecc, ec)
                s, err := logical_trace.BuildSchema(ec.GetSchema(), 
ec.GetIndexRules())
                if err != nil {
                        resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("fail to build schema for trace %s: %v", meta.GetName(), err))
                        return
                }
                schemas = append(schemas, s)
+               metadata = append(metadata, meta)
        }
 
-       // For now, we only implement BuildSchema - the query execution stops 
here
-       // This is exactly what was requested: "executeQuery stop at the schema 
parsing since we only implement the BuildSchema right now"
+       plan, err := logical_trace.Analyze(queryCriteria, metadata, schemas, 
ecc)
+       if err != nil {
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to analyze the query request for trace %s: %v", queryCriteria.GetName(), err))
+               return
+       }
 
        if p.log.Debug().Enabled() {
-               p.log.Debug().Int("schemas_built", len(schemas)).Msg("trace 
schemas built successfully, execution stopped at schema parsing")
+               p.log.Debug().Str("plan", plan.String()).Msg("trace query plan")
+       }
+
+       var tracer *query.Tracer
+       var span *query.Span
+       if queryCriteria.Trace {
+               tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
+               span, ctx = tracer.StartSpan(ctx, "data-%s", 
p.queryService.nodeID)
+               span.Tag("plan", plan.String())
+               defer func() {
+                       data := resp.Data()
+                       switch d := data.(type) {
+                       case *tracev1.QueryResponse:
+                               d.TraceQueryResult = tracer.ToProto()
+                       case *common.Error:
+                               span.Error(errors.New(d.Error()))
+                               resp = bus.NewMessage(bus.MessageID(now), 
&tracev1.QueryResponse{TraceQueryResult: tracer.ToProto()})
+                       default:
+                               panic("unexpected data type")
+                       }
+                       span.Stop()
+               }()
+       }
+
+       te := plan.(executor.TraceExecutable)
+       defer te.Close()
+       result, err := te.Execute(ctx)
+       if err != nil {
+               p.log.Error().Err(err).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to execute the trace query plan")
+               resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("execute the query plan for trace %s: %v", 
queryCriteria.GetName(), err))
+               return
+       }
+
+       // Convert model.TraceResult to tracev1.QueryResponse format
+       spans := make([]*tracev1.Span, len(result.Spans))
+       for i, spanBytes := range result.Spans {
+               // Create trace tags from the result
+               var traceTags []*modelv1.Tag
+               if result.Tags != nil {
+                       for _, tag := range result.Tags {
+                               if len(tag.Values) > 0 {
+                                       traceTags = append(traceTags, 
&modelv1.Tag{
+                                               Key:   tag.Name,
+                                               Value: tag.Values[0],
+                                       })
+                               }
+                       }
+               }
+
+               spans[i] = &tracev1.Span{
+                       Tags: traceTags,
+                       Span: spanBytes,
+               }
        }
 
-       // Return an empty response indicating successful schema building but 
no actual execution
-       resp = bus.NewMessage(bus.MessageID(now), &tracev1.QueryResponse{Spans: 
make([]*tracev1.Span, 0)})
+       resp = bus.NewMessage(bus.MessageID(now), &tracev1.QueryResponse{Spans: 
spans})
+
+       if !queryCriteria.Trace && p.slowQuery > 0 {
+               latency := time.Since(n)
+               if latency > p.slowQuery {
+                       p.log.Warn().Dur("latency", latency).RawJSON("req", 
logger.Proto(queryCriteria)).Int("resp_count", len(spans)).Msg("trace slow 
query")
+               }
+       }
        return
 }
diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go
index ed33c4f0..df1c57ff 100644
--- a/banyand/trace/trace.go
+++ b/banyand/trace/trace.go
@@ -22,6 +22,7 @@
 package trace
 
 import (
+       "context"
        "sync/atomic"
        "time"
 
@@ -31,6 +32,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/schema"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -74,6 +76,7 @@ type Query interface {
 type Trace interface {
        GetSchema() *databasev1.Trace
        GetIndexRules() []*databasev1.IndexRule
+       Query(ctx context.Context, opts model.TraceQueryOptions) 
(model.TraceQueryResult, error)
 }
 
 type indexSchema struct {
diff --git a/pkg/query/logical/trace/schema.go 
b/pkg/query/logical/trace/schema.go
index c27bc846..04c39dc8 100644
--- a/pkg/query/logical/trace/schema.go
+++ b/pkg/query/logical/trace/schema.go
@@ -74,7 +74,7 @@ func (s *schema) CreateFieldRef(_ ...*logical.Field) 
([]*logical.FieldRef, error
        panic("no field for trace")
 }
 
-func (s *schema) IndexRuleDefined(indexRuleName string) (bool, 
*databasev1.IndexRule) {
+func (s *schema) IndexRuleDefined(_ string) (bool, *databasev1.IndexRule) {
        panic("trace does not support index rule")
 }
 
diff --git a/pkg/query/logical/trace/trace_analyzer.go 
b/pkg/query/logical/trace/trace_analyzer.go
index ee5bff71..0cbdbef6 100644
--- a/pkg/query/logical/trace/trace_analyzer.go
+++ b/pkg/query/logical/trace/trace_analyzer.go
@@ -230,7 +230,7 @@ type unresolvedTraceMerger struct {
        tagProjection [][]*logical.Tag
 }
 
-func (utm *unresolvedTraceMerger) Analyze(s logical.Schema) (logical.Plan, 
error) {
+func (utm *unresolvedTraceMerger) Analyze(_ logical.Schema) (logical.Plan, 
error) {
        // TODO: Implement trace merger logic
        return nil, fmt.Errorf("trace merger not implemented yet")
 }
@@ -246,7 +246,7 @@ type unresolvedTraceDistributed struct {
        criteria *tracev1.QueryRequest
 }
 
-func (utd *unresolvedTraceDistributed) Analyze(s logical.Schema) 
(logical.Plan, error) {
+func (utd *unresolvedTraceDistributed) Analyze(_ logical.Schema) 
(logical.Plan, error) {
        // TODO: Implement distributed trace analysis
        return nil, fmt.Errorf("distributed trace query not implemented yet")
 }
@@ -268,7 +268,7 @@ type distributedTraceLimit struct {
        limitNum  uint32
 }
 
-func (dtl *distributedTraceLimit) Analyze(s logical.Schema) (logical.Plan, 
error) {
+func (dtl *distributedTraceLimit) Analyze(_ logical.Schema) (logical.Plan, 
error) {
        // TODO: Implement distributed trace limit analysis
        return nil, fmt.Errorf("distributed trace limit not implemented yet")
 }
diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go 
b/pkg/query/logical/trace/trace_plan_tag_filter.go
index 7107cb8e..4c77d184 100644
--- a/pkg/query/logical/trace/trace_plan_tag_filter.go
+++ b/pkg/query/logical/trace/trace_plan_tag_filter.go
@@ -101,19 +101,6 @@ func (uis *unresolvedTraceTagFilter) 
selectTraceScanner(ctx *traceAnalyzeContext
        }
 }
 
-func traceTagFilter(startTime, endTime time.Time, metadata *commonv1.Metadata, 
criteria *modelv1.Criteria,
-       projection [][]*logical.Tag, ec executor.TraceExecutionContext,
-) logical.UnresolvedPlan {
-       return &unresolvedTraceTagFilter{
-               startTime:      startTime,
-               endTime:        endTime,
-               metadata:       metadata,
-               criteria:       criteria,
-               projectionTags: projection,
-               ec:             ec,
-       }
-}
-
 type traceAnalyzeContext struct {
        s                logical.Schema
        skippingFilter   index.Filter

Reply via email to