This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch trace/query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit b859b27081c932afba0ab2c92abceffb6b36e117
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Fri Aug 29 07:57:35 2025 +0800

    Enhance query service to support trace processing by adding a trace query 
processor and updating service initialization. Modify related test setups to 
accommodate the new trace service integration.
---
 banyand/measure/measure_suite_test.go |  2 +-
 banyand/query/processor.go            | 66 +++++++++++++++++++++++++++++++++++
 banyand/query/query.go                | 10 +++++-
 banyand/stream/stream_suite_test.go   |  2 +-
 pkg/cmdsetup/data.go                  |  7 +++-
 pkg/cmdsetup/standalone.go            |  2 +-
 pkg/query/executor/interface.go       | 12 +++++++
 7 files changed, 96 insertions(+), 5 deletions(-)

diff --git a/banyand/measure/measure_suite_test.go 
b/banyand/measure/measure_suite_test.go
index e4dd13b3..4fa2a474 100644
--- a/banyand/measure/measure_suite_test.go
+++ b/banyand/measure/measure_suite_test.go
@@ -81,7 +81,7 @@ func setUp() (*services, func()) {
        measureService, err := measure.NewStandalone(metadataService, pipeline, 
nil, metricSvc, pm)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        preloadMeasureSvc := &preloadMeasureService{metaSvc: metadataService}
-       querySvc, err := query.NewService(context.TODO(), nil, measureService, 
metadataService, pipeline)
+       querySvc, err := query.NewService(context.TODO(), nil, measureService, 
nil, metadataService, pipeline)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
 
        var flags []string
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 203d2dd5..b7d855ef 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -31,8 +31,10 @@ import (
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/stream"
+       "github.com/apache/skywalking-banyandb/banyand/trace"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/query"
@@ -40,6 +42,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
        logical_measure 
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
        logical_stream 
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
+       logical_trace 
"github.com/apache/skywalking-banyandb/pkg/query/logical/trace"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -52,6 +55,7 @@ var (
        _ bus.MessageListener = (*streamQueryProcessor)(nil)
        _ bus.MessageListener = (*measureQueryProcessor)(nil)
        _ bus.MessageListener = (*topNQueryProcessor)(nil)
+       _ bus.MessageListener = (*traceQueryProcessor)(nil)
 )
 
 type streamQueryProcessor struct {
@@ -420,3 +424,65 @@ func buildCriteriaTree(conditions []*modelv1.Condition) 
*modelv1.Criteria {
                },
        }
 }
+
+type traceQueryProcessor struct {
+       traceService trace.Service
+       *queryService
+       *bus.UnImplementedHealthyListener
+}
+
+func (p *traceQueryProcessor) Rev(ctx context.Context, message bus.Message) 
(resp bus.Message) {
+       n := time.Now()
+       now := n.UnixNano()
+       queryCriteria, ok := message.Data().(*tracev1.QueryRequest)
+       if !ok {
+               resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("invalid event data type"))
+               return
+       }
+       if p.log.Debug().Enabled() {
+               p.log.Debug().RawJSON("criteria", 
logger.Proto(queryCriteria)).Msg("received a trace query request")
+       }
+       defer func() {
+               if err := recover(); err != nil {
+                       p.log.Error().Interface("err", err).RawJSON("req", 
logger.Proto(queryCriteria)).Str("stack", string(debug.Stack())).Msg("panic")
+                       resp = 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), common.NewError("panic"))
+               }
+       }()
+
+       resp = p.executeQuery(ctx, queryCriteria)
+       return
+}
+
+func (p *traceQueryProcessor) executeQuery(_ context.Context, queryCriteria 
*tracev1.QueryRequest) (resp bus.Message) {
+       n := time.Now()
+       now := n.UnixNano()
+       var schemas []logical.Schema
+       for i := range queryCriteria.Groups {
+               meta := &commonv1.Metadata{
+                       Name:  queryCriteria.Name,
+                       Group: queryCriteria.Groups[i],
+               }
+               ec, err := p.traceService.Trace(meta)
+               if err != nil {
+                       resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("fail to get execution context for trace %s: %v", 
meta.GetName(), err))
+                       return
+               }
+               s, err := logical_trace.BuildSchema(ec.GetSchema(), 
ec.GetIndexRules())
+               if err != nil {
+                       resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("fail to build schema for trace %s: %v", meta.GetName(), err))
+                       return
+               }
+               schemas = append(schemas, s)
+       }
+
+       // For now, we only implement BuildSchema - the query execution stops 
here
+       // This is exactly what was requested: "executeQuery stop at the schema 
parsing since we only implement the BuildSchema right now"
+
+       if p.log.Debug().Enabled() {
+               p.log.Debug().Int("schemas_built", len(schemas)).Msg("trace 
schemas built successfully, execution stopped at schema parsing")
+       }
+
+       // Return an empty response indicating successful schema building but 
no actual execution
+       resp = bus.NewMessage(bus.MessageID(now), &tracev1.QueryResponse{Spans: 
make([]*tracev1.Span, 0)})
+       return
+}
diff --git a/banyand/query/query.go b/banyand/query/query.go
index ef9315af..e11c04a5 100644
--- a/banyand/query/query.go
+++ b/banyand/query/query.go
@@ -31,6 +31,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/stream"
+       "github.com/apache/skywalking-banyandb/banyand/trace"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -42,12 +43,13 @@ type queryService struct {
        sqp         *streamQueryProcessor
        mqp         *measureQueryProcessor
        tqp         *topNQueryProcessor
+       trqp        *traceQueryProcessor
        nodeID      string
        slowQuery   time.Duration
 }
 
 // NewService return a new query service.
-func NewService(_ context.Context, streamService stream.Service, 
measureService measure.Service,
+func NewService(_ context.Context, streamService stream.Service, 
measureService measure.Service, traceService trace.Service,
        metaService metadata.Repo, pipeline queue.Server,
 ) (run.Unit, error) {
        svc := &queryService{
@@ -69,6 +71,11 @@ func NewService(_ context.Context, streamService 
stream.Service, measureService
                measureService: measureService,
                queryService:   svc,
        }
+       // trace query processor
+       svc.trqp = &traceQueryProcessor{
+               traceService: traceService,
+               queryService: svc,
+       }
        return svc, nil
 }
 
@@ -88,6 +95,7 @@ func (q *queryService) PreRun(ctx context.Context) error {
                q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp),
                q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp),
                q.pipeline.Subscribe(data.TopicTopNQuery, q.tqp),
+               q.pipeline.Subscribe(data.TopicTraceQuery, q.trqp),
        )
 }
 
diff --git a/banyand/stream/stream_suite_test.go 
b/banyand/stream/stream_suite_test.go
index 57d9b430..43c06907 100644
--- a/banyand/stream/stream_suite_test.go
+++ b/banyand/stream/stream_suite_test.go
@@ -82,7 +82,7 @@ func setUp() (*services, func()) {
        streamService, err := stream.NewService(metadataService, pipeline, 
metricSvc, pm, nil)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        preloadStreamSvc := &preloadStreamService{metaSvc: metadataService}
-       querySvc, err := query.NewService(context.TODO(), streamService, nil, 
metadataService, pipeline)
+       querySvc, err := query.NewService(context.TODO(), streamService, nil, 
nil, metadataService, pipeline)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        var flags []string
        metaPath, metaDeferFunc, err := test.NewSpace()
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index be325a8a..f0c8eb6c 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -33,6 +33,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/queue/sub"
        "github.com/apache/skywalking-banyandb/banyand/stream"
+       "github.com/apache/skywalking-banyandb/banyand/trace"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/version"
@@ -62,7 +63,11 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate measure service")
        }
-       q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, 
pipeline)
+       traceSvc, err := trace.NewService(metaSvc, pipeline, metricSvc, pm)
+       if err != nil {
+               l.Fatal().Err(err).Msg("failed to initiate trace service")
+       }
+       q, err := query.NewService(ctx, streamSvc, measureSvc, traceSvc, 
metaSvc, pipeline)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate query processor")
        }
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index fb6c2028..5835bbb0 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -70,7 +70,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate measure service")
        }
-       q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, 
dataPipeline)
+       q, err := query.NewService(ctx, streamSvc, measureSvc, traceSvc, 
metaSvc, dataPipeline)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate query processor")
        }
diff --git a/pkg/query/executor/interface.go b/pkg/query/executor/interface.go
index 14323577..e1dbc4c9 100644
--- a/pkg/query/executor/interface.go
+++ b/pkg/query/executor/interface.go
@@ -24,6 +24,7 @@ import (
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
 )
@@ -79,3 +80,14 @@ func WithDistributedExecutionContext(ctx context.Context, ec 
DistributedExecutio
 func FromDistributedExecutionContext(ctx context.Context) 
DistributedExecutionContext {
        return 
ctx.Value(distributedExecutionContextKeyInstance).(DistributedExecutionContext)
 }
+
+// TraceExecutionContext allows retrieving data through the trace module.
+type TraceExecutionContext interface {
+       Query(ctx context.Context, opts model.TraceQueryOptions) 
(model.TraceQueryResult, error)
+}
+
+// TraceExecutable allows querying in the trace schema.
+type TraceExecutable interface {
+       Execute(context.Context) ([]*tracev1.Span, error)
+       Close()
+}

Reply via email to