This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch trace/query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit b859b27081c932afba0ab2c92abceffb6b36e117 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Fri Aug 29 07:57:35 2025 +0800 Enhance query service to support trace processing by adding a trace query processor and updating service initialization. Modify related test setups to accommodate the new trace service integration. --- banyand/measure/measure_suite_test.go | 2 +- banyand/query/processor.go | 66 +++++++++++++++++++++++++++++++++++ banyand/query/query.go | 10 +++++- banyand/stream/stream_suite_test.go | 2 +- pkg/cmdsetup/data.go | 7 +++- pkg/cmdsetup/standalone.go | 2 +- pkg/query/executor/interface.go | 12 +++++++ 7 files changed, 96 insertions(+), 5 deletions(-) diff --git a/banyand/measure/measure_suite_test.go b/banyand/measure/measure_suite_test.go index e4dd13b3..4fa2a474 100644 --- a/banyand/measure/measure_suite_test.go +++ b/banyand/measure/measure_suite_test.go @@ -81,7 +81,7 @@ func setUp() (*services, func()) { measureService, err := measure.NewStandalone(metadataService, pipeline, nil, metricSvc, pm) gomega.Expect(err).NotTo(gomega.HaveOccurred()) preloadMeasureSvc := &preloadMeasureService{metaSvc: metadataService} - querySvc, err := query.NewService(context.TODO(), nil, measureService, metadataService, pipeline) + querySvc, err := query.NewService(context.TODO(), nil, measureService, nil, metadataService, pipeline) gomega.Expect(err).NotTo(gomega.HaveOccurred()) var flags []string diff --git a/banyand/query/processor.go b/banyand/query/processor.go index 203d2dd5..b7d855ef 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -31,8 +31,10 @@ import ( measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" + tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/stream" + "github.com/apache/skywalking-banyandb/banyand/trace" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/query" @@ -40,6 +42,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/logical" logical_measure "github.com/apache/skywalking-banyandb/pkg/query/logical/measure" logical_stream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream" + logical_trace "github.com/apache/skywalking-banyandb/pkg/query/logical/trace" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -52,6 +55,7 @@ var ( _ bus.MessageListener = (*streamQueryProcessor)(nil) _ bus.MessageListener = (*measureQueryProcessor)(nil) _ bus.MessageListener = (*topNQueryProcessor)(nil) + _ bus.MessageListener = (*traceQueryProcessor)(nil) ) type streamQueryProcessor struct { @@ -420,3 +424,65 @@ func buildCriteriaTree(conditions []*modelv1.Condition) *modelv1.Criteria { }, } } + +type traceQueryProcessor struct { + traceService trace.Service + *queryService + *bus.UnImplementedHealthyListener +} + +func (p *traceQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp bus.Message) { + n := time.Now() + now := n.UnixNano() + queryCriteria, ok := message.Data().(*tracev1.QueryRequest) + if !ok { + resp = bus.NewMessage(bus.MessageID(now), common.NewError("invalid event data type")) + return + } + if p.log.Debug().Enabled() { + p.log.Debug().RawJSON("criteria", logger.Proto(queryCriteria)).Msg("received a trace query request") + } + defer func() { + if err := recover(); err != nil { + p.log.Error().Interface("err", err).RawJSON("req", logger.Proto(queryCriteria)).Str("stack", string(debug.Stack())).Msg("panic") + resp = bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic")) + } + }() + + resp = p.executeQuery(ctx, queryCriteria) + return +} + +func (p *traceQueryProcessor) executeQuery(_ context.Context, queryCriteria *tracev1.QueryRequest) (resp bus.Message) { + n := time.Now() + now := n.UnixNano() + var schemas []logical.Schema + for i := range queryCriteria.Groups { + meta := &commonv1.Metadata{ + Name: queryCriteria.Name, + Group: queryCriteria.Groups[i], + } + ec, err := p.traceService.Trace(meta) + if err != nil { + resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to get execution context for trace %s: %v", meta.GetName(), err)) + return + } + s, err := logical_trace.BuildSchema(ec.GetSchema(), ec.GetIndexRules()) + if err != nil { + resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to build schema for trace %s: %v", meta.GetName(), err)) + return + } + schemas = append(schemas, s) + } + + // For now, we only implement BuildSchema - the query execution stops here + // This is exactly what was requested: "executeQuery stop at the schema parsing since we only implement the BuildSchema right now" + + if p.log.Debug().Enabled() { + p.log.Debug().Int("schemas_built", len(schemas)).Msg("trace schemas built successfully, execution stopped at schema parsing") + } + + // Return an empty response indicating successful schema building but no actual execution + resp = bus.NewMessage(bus.MessageID(now), &tracev1.QueryResponse{Spans: make([]*tracev1.Span, 0)}) + return +} diff --git a/banyand/query/query.go b/banyand/query/query.go index ef9315af..e11c04a5 100644 --- a/banyand/query/query.go +++ b/banyand/query/query.go @@ -31,6 +31,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/stream" + "github.com/apache/skywalking-banyandb/banyand/trace" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" ) @@ -42,12 +43,13 @@ type queryService struct { sqp *streamQueryProcessor mqp *measureQueryProcessor tqp *topNQueryProcessor + trqp *traceQueryProcessor nodeID string slowQuery time.Duration } // NewService return a new query service. -func NewService(_ context.Context, streamService stream.Service, measureService measure.Service, +func NewService(_ context.Context, streamService stream.Service, measureService measure.Service, traceService trace.Service, metaService metadata.Repo, pipeline queue.Server, ) (run.Unit, error) { svc := &queryService{ @@ -69,6 +71,11 @@ func NewService(_ context.Context, streamService stream.Service, measureService measureService: measureService, queryService: svc, } + // trace query processor + svc.trqp = &traceQueryProcessor{ + traceService: traceService, + queryService: svc, + } return svc, nil } @@ -88,6 +95,7 @@ func (q *queryService) PreRun(ctx context.Context) error { q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp), q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp), q.pipeline.Subscribe(data.TopicTopNQuery, q.tqp), + q.pipeline.Subscribe(data.TopicTraceQuery, q.trqp), ) } diff --git a/banyand/stream/stream_suite_test.go b/banyand/stream/stream_suite_test.go index 57d9b430..43c06907 100644 --- a/banyand/stream/stream_suite_test.go +++ b/banyand/stream/stream_suite_test.go @@ -82,7 +82,7 @@ func setUp() (*services, func()) { streamService, err := stream.NewService(metadataService, pipeline, metricSvc, pm, nil) gomega.Expect(err).NotTo(gomega.HaveOccurred()) preloadStreamSvc := &preloadStreamService{metaSvc: metadataService} - querySvc, err := query.NewService(context.TODO(), streamService, nil, metadataService, pipeline) + querySvc, err := query.NewService(context.TODO(), streamService, nil, nil, metadataService, pipeline) gomega.Expect(err).NotTo(gomega.HaveOccurred()) var flags []string metaPath, metaDeferFunc, err := test.NewSpace() diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go index be325a8a..f0c8eb6c 100644 --- a/pkg/cmdsetup/data.go +++ b/pkg/cmdsetup/data.go @@ -33,6 +33,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/banyand/queue/sub" "github.com/apache/skywalking-banyandb/banyand/stream" + "github.com/apache/skywalking-banyandb/banyand/trace" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/version" @@ -62,7 +63,11 @@ func newDataCmd(runners ...run.Unit) *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate measure service") } - q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, pipeline) + traceSvc, err := trace.NewService(metaSvc, pipeline, metricSvc, pm) + if err != nil { + l.Fatal().Err(err).Msg("failed to initiate trace service") + } + q, err := query.NewService(ctx, streamSvc, measureSvc, traceSvc, metaSvc, pipeline) if err != nil { l.Fatal().Err(err).Msg("failed to initiate query processor") } diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go index fb6c2028..5835bbb0 100644 --- a/pkg/cmdsetup/standalone.go +++ b/pkg/cmdsetup/standalone.go @@ -70,7 +70,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command { if err != nil { l.Fatal().Err(err).Msg("failed to initiate measure service") } - q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, dataPipeline) + q, err := query.NewService(ctx, streamSvc, measureSvc, traceSvc, metaSvc, dataPipeline) if err != nil { l.Fatal().Err(err).Msg("failed to initiate query processor") } diff --git a/pkg/query/executor/interface.go b/pkg/query/executor/interface.go index 14323577..e1dbc4c9 100644 --- a/pkg/query/executor/interface.go +++ b/pkg/query/executor/interface.go @@ -24,6 +24,7 @@ import ( measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" + tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/query/model" ) @@ -79,3 +80,14 @@ func WithDistributedExecutionContext(ctx context.Context, ec DistributedExecutio func FromDistributedExecutionContext(ctx context.Context) DistributedExecutionContext { return ctx.Value(distributedExecutionContextKeyInstance).(DistributedExecutionContext) } + +// TraceExecutionContext allows retrieving data through the trace module. +type TraceExecutionContext interface { + Query(ctx context.Context, opts model.TraceQueryOptions) (model.TraceQueryResult, error) +} + +// TraceExecutable allows querying in the trace schema. +type TraceExecutable interface { + Execute(context.Context) ([]*tracev1.Span, error) + Close() +}