This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch trace/query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit ec4a46b00b3cac32f2e21b15f53f785119a1cf87 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Fri Aug 29 19:54:40 2025 +0800 Enhance trace query processing by introducing trace ID handling in filtering and analysis. Update functions to pass trace ID tag names and collect trace IDs during query execution. Refactor related components for improved error handling and integration with existing trace logic. --- banyand/query/processor.go | 9 ++- banyand/trace/metadata.go | 2 + banyand/trace/trace.go | 9 +-- pkg/query/logical/trace/index_filter.go | 93 +++++++++++++++++----- pkg/query/logical/trace/trace_analyzer.go | 9 ++- pkg/query/logical/trace/trace_plan_local.go | 2 + pkg/query/logical/trace/trace_plan_tag_filter.go | 17 ++-- pkg/query/model/model.go | 4 +- pkg/schema/cache.go | 6 ++ test/cases/init.go | 6 +- test/cases/trace/data/input/all.yml | 5 ++ .../standalone/query/query_suite_test.go | 9 ++- 12 files changed, 126 insertions(+), 45 deletions(-) diff --git a/banyand/query/processor.go b/banyand/query/processor.go index 347b998b..18f8be84 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -464,6 +464,7 @@ func (p *traceQueryProcessor) executeQuery(ctx context.Context, queryCriteria *t var metadata []*commonv1.Metadata var schemas []logical.Schema var ecc []executor.TraceExecutionContext + var traceIDTagName string for i := range queryCriteria.Groups { meta := &commonv1.Metadata{ Name: queryCriteria.Name, @@ -482,9 +483,15 @@ func (p *traceQueryProcessor) executeQuery(ctx context.Context, queryCriteria *t } schemas = append(schemas, s) metadata = append(metadata, meta) + if traceIDTagName != "" && traceIDTagName != ec.GetSchema().GetTraceIdTagName() { + resp = bus.NewMessage(bus.MessageID(now), common.NewError("trace id tag name mismatch for trace %s: %s != %s", + meta.GetName(), traceIDTagName, ec.GetSchema().GetTraceIdTagName())) + return + } + traceIDTagName = ec.GetSchema().GetTraceIdTagName() } - plan, err := logical_trace.Analyze(queryCriteria, metadata, schemas, ecc) + plan, err := logical_trace.Analyze(queryCriteria, metadata, schemas, ecc, traceIDTagName) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for trace %s: %v", queryCriteria.GetName(), err)) return diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go index b44836bf..398876a5 100644 --- a/banyand/trace/metadata.go +++ b/banyand/trace/metadata.go @@ -290,6 +290,7 @@ func newSupplier(path string, svc *standalone, nodeLabels map[string]string) *su pm: svc.pm, l: svc.l, nodeLabels: nodeLabels, + schemaRepo: &svc.schemaRepo, path: path, option: opt, } @@ -396,6 +397,7 @@ func newQueueSupplier(path string, svc *liaison, traceDataNodeRegistry grpc.Node l: svc.l, path: path, option: opt, + schemaRepo: &svc.schemaRepo, } } diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go index df1c57ff..9538ce10 100644 --- a/banyand/trace/trace.go +++ b/banyand/trace/trace.go @@ -105,22 +105,21 @@ type trace struct { group string } -type traceSpec struct { - schema *databasev1.Trace -} - func (t *trace) GetSchema() *databasev1.Trace { return t.schema } func (t *trace) GetIndexRules() []*databasev1.IndexRule { if is := t.indexSchema.Load(); is != nil { - return is.(*indexSchema).indexRules + return is.(indexSchema).indexRules } return nil } func (t *trace) OnIndexUpdate(index []*databasev1.IndexRule) { + if len(index) == 0 { + return + } var is indexSchema is.indexRules = index is.parse(t.schema) diff --git a/pkg/query/logical/trace/index_filter.go b/pkg/query/logical/trace/index_filter.go index 227e162b..da60c9d4 100644 --- a/pkg/query/logical/trace/index_filter.go +++ b/pkg/query/logical/trace/index_filter.go @@ -32,13 +32,14 @@ import ( // Trace conditions are either entity or skipping index. // Trace creates explicit index rules for skipping index on all tags that don't belong to entity. func buildFilter(criteria *modelv1.Criteria, tagNames map[string]bool, - entityDict map[string]int, entity []*modelv1.TagValue, -) (index.Filter, [][]*modelv1.TagValue, []string, error) { + entityDict map[string]int, entity []*modelv1.TagValue, traceIDTagName string, +) (index.Filter, [][]*modelv1.TagValue, []string, []string, error) { if criteria == nil { - return nil, [][]*modelv1.TagValue{entity}, nil, nil + return nil, [][]*modelv1.TagValue{entity}, nil, nil, nil } var collectedTagNames []string + var traceIDs []string switch criteria.GetExp().(type) { case *modelv1.Criteria_Condition: cond := criteria.GetCondition() @@ -46,64 +47,74 @@ func buildFilter(criteria *modelv1.Criteria, tagNames map[string]bool, collectedTagNames = append(collectedTagNames, cond.Name) // Check if the tag name exists in the allowed tag names if !tagNames[cond.Name] { - return nil, nil, collectedTagNames, errors.Errorf("tag name '%s' not found in trace schema", cond.Name) + return nil, nil, collectedTagNames, traceIDs, errors.Errorf("tag name '%s' not found in trace schema", cond.Name) } + + // Extract trace IDs if this condition is for the trace ID tag + if cond.Name == traceIDTagName && (cond.Op == modelv1.Condition_BINARY_OP_EQ || cond.Op == modelv1.Condition_BINARY_OP_IN) { + traceIDs = extractTraceIDsFromCondition(cond) + } + _, parsedEntity, err := logical.ParseExprOrEntity(entityDict, entity, cond) if err != nil { - return nil, nil, collectedTagNames, err + return nil, nil, collectedTagNames, traceIDs, err } if parsedEntity != nil { - return nil, parsedEntity, collectedTagNames, nil + return nil, parsedEntity, collectedTagNames, traceIDs, nil } // For trace, all non-entity tags have skipping index expr, _, err := logical.ParseExprOrEntity(entityDict, entity, cond) if err != nil { - return nil, nil, collectedTagNames, err + return nil, nil, collectedTagNames, traceIDs, err } filter, entities, err := parseConditionToFilter(cond, entity, expr) - return filter, entities, collectedTagNames, err + return filter, entities, collectedTagNames, traceIDs, err case *modelv1.Criteria_Le: le := criteria.GetLe() if le.GetLeft() == nil && le.GetRight() == nil { - return nil, nil, nil, errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and right) of [%v] are empty", criteria) + return nil, nil, nil, traceIDs, errors.WithMessagef(logical.ErrInvalidLogicalExpression, "both sides(left and right) of [%v] are empty", criteria) } if le.GetLeft() == nil { - return buildFilter(le.Right, tagNames, entityDict, entity) + return buildFilter(le.Right, tagNames, entityDict, entity, traceIDTagName) } if le.GetRight() == nil { - return buildFilter(le.Left, tagNames, entityDict, entity) + return buildFilter(le.Left, tagNames, entityDict, entity, traceIDTagName) } - left, leftEntities, leftTagNames, err := buildFilter(le.Left, tagNames, entityDict, entity) + left, leftEntities, leftTagNames, leftTraceIDs, err := buildFilter(le.Left, tagNames, entityDict, entity, traceIDTagName) if err != nil { - return nil, nil, leftTagNames, err + return nil, nil, leftTagNames, leftTraceIDs, err } - right, rightEntities, rightTagNames, err := buildFilter(le.Right, tagNames, entityDict, entity) + right, rightEntities, rightTagNames, rightTraceIDs, err := buildFilter(le.Right, tagNames, entityDict, entity, traceIDTagName) if err != nil { - return nil, nil, append(leftTagNames, rightTagNames...), err + return nil, nil, append(leftTagNames, rightTagNames...), append(leftTraceIDs, rightTraceIDs...), err } // Merge tag names from both sides collectedTagNames = append(collectedTagNames, leftTagNames...) collectedTagNames = append(collectedTagNames, rightTagNames...) + // Merge trace IDs from both sides + traceIDs = append(traceIDs, leftTraceIDs...) + traceIDs = append(traceIDs, rightTraceIDs...) + entities := logical.ParseEntities(le.Op, entity, leftEntities, rightEntities) if entities == nil { - return nil, nil, collectedTagNames, nil + return nil, nil, collectedTagNames, traceIDs, nil } if left == nil { - return right, entities, collectedTagNames, nil + return right, entities, collectedTagNames, traceIDs, nil } if right == nil { - return left, entities, collectedTagNames, nil + return left, entities, collectedTagNames, traceIDs, nil } switch le.Op { case modelv1.LogicalExpression_LOGICAL_OP_AND: - return &traceAndFilter{left: left, right: right}, entities, collectedTagNames, nil + return &traceAndFilter{left: left, right: right}, entities, collectedTagNames, traceIDs, nil case modelv1.LogicalExpression_LOGICAL_OP_OR: - return &traceOrFilter{left: left, right: right}, entities, collectedTagNames, nil + return &traceOrFilter{left: left, right: right}, entities, collectedTagNames, traceIDs, nil } } - return nil, nil, nil, logical.ErrInvalidCriteriaType + return nil, nil, nil, traceIDs, logical.ErrInvalidCriteriaType } func parseConditionToFilter(cond *modelv1.Condition, entity []*modelv1.TagValue, expr logical.LiteralExpr) (index.Filter, [][]*modelv1.TagValue, error) { @@ -289,3 +300,43 @@ func (tmf *traceMatchFilter) ShouldSkip(_ index.FilterOp) (bool, error) { func (tmf *traceMatchFilter) String() string { return tmf.op + ":" + tmf.tagName } + +// extractTraceIDsFromCondition extracts trace IDs from equal and in conditions. +func extractTraceIDsFromCondition(cond *modelv1.Condition) []string { + var traceIDs []string + + switch cond.Op { + case modelv1.Condition_BINARY_OP_EQ: + if cond.Value != nil && cond.Value.Value != nil { + switch val := cond.Value.Value.(type) { + case *modelv1.TagValue_Str: + if val.Str != nil { + traceIDs = append(traceIDs, val.Str.Value) + } + case *modelv1.TagValue_StrArray: + if val.StrArray != nil { + traceIDs = append(traceIDs, val.StrArray.Value...) + } + } + } + case modelv1.Condition_BINARY_OP_IN: + if cond.Value != nil && cond.Value.Value != nil { + switch val := cond.Value.Value.(type) { + case *modelv1.TagValue_StrArray: + if val.StrArray != nil { + traceIDs = append(traceIDs, val.StrArray.Value...) + } + case *modelv1.TagValue_Str: + if val.Str != nil { + traceIDs = append(traceIDs, val.Str.Value) + } + } + } + case modelv1.Condition_BINARY_OP_NE, modelv1.Condition_BINARY_OP_LT, modelv1.Condition_BINARY_OP_GT, + modelv1.Condition_BINARY_OP_LE, modelv1.Condition_BINARY_OP_GE, modelv1.Condition_BINARY_OP_HAVING, + modelv1.Condition_BINARY_OP_NOT_HAVING, modelv1.Condition_BINARY_OP_NOT_IN, modelv1.Condition_BINARY_OP_MATCH: + // These operations don't support trace ID extraction + } + + return traceIDs +} diff --git a/pkg/query/logical/trace/trace_analyzer.go b/pkg/query/logical/trace/trace_analyzer.go index 0cbdbef6..5ac4504d 100644 --- a/pkg/query/logical/trace/trace_analyzer.go +++ b/pkg/query/logical/trace/trace_analyzer.go @@ -31,7 +31,9 @@ import ( const defaultLimit uint32 = 20 // Analyze converts logical expressions to executable operation tree represented by Plan. -func Analyze(criteria *tracev1.QueryRequest, metadata []*commonv1.Metadata, ss []logical.Schema, ecc []executor.TraceExecutionContext) (logical.Plan, error) { +func Analyze(criteria *tracev1.QueryRequest, metadata []*commonv1.Metadata, ss []logical.Schema, + ecc []executor.TraceExecutionContext, traceIDTagName string, +) (logical.Plan, error) { // parse fields if len(metadata) != len(ss) { return nil, fmt.Errorf("number of schemas %d not equal to number of metadata %d", len(ss), len(metadata)) @@ -40,7 +42,7 @@ func Analyze(criteria *tracev1.QueryRequest, metadata []*commonv1.Metadata, ss [ var s logical.Schema tagProjection := convertStringProjectionToTags(criteria.GetTagProjection()) if len(metadata) == 1 { - plan = parseTraceTags(criteria, metadata[0], ecc[0], tagProjection) + plan = parseTraceTags(criteria, metadata[0], ecc[0], tagProjection, traceIDTagName) s = ss[0] } else { var err error @@ -191,7 +193,7 @@ func newTraceLimit(input logical.UnresolvedPlan, offset, num uint32) logical.Unr } func parseTraceTags(criteria *tracev1.QueryRequest, metadata *commonv1.Metadata, - ec executor.TraceExecutionContext, tagProjection [][]*logical.Tag, + ec executor.TraceExecutionContext, tagProjection [][]*logical.Tag, traceIDTagName string, ) logical.UnresolvedPlan { timeRange := criteria.GetTimeRange() return &unresolvedTraceTagFilter{ @@ -201,6 +203,7 @@ func parseTraceTags(criteria *tracev1.QueryRequest, metadata *commonv1.Metadata, criteria: criteria.Criteria, projectionTags: tagProjection, ec: ec, + traceIDTagName: traceIDTagName, } } diff --git a/pkg/query/logical/trace/trace_plan_local.go b/pkg/query/logical/trace/trace_plan_local.go index 15552f39..2135ec3e 100644 --- a/pkg/query/logical/trace/trace_plan_local.go +++ b/pkg/query/logical/trace/trace_plan_local.go @@ -48,6 +48,7 @@ type localScan struct { projectionTags *model.TagProjection timeRange timestamp.TimeRange projectionTagRefs [][]*logical.TagRef + traceIDs []string maxTraceSize int } @@ -89,6 +90,7 @@ func (i *localScan) Execute(ctx context.Context) (model.TraceResult, error) { Order: orderBy, TagProjection: i.projectionTags, MaxTraceSize: i.maxTraceSize, + TraceIDs: i.traceIDs, }); err != nil { return model.TraceResult{}, err } diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go b/pkg/query/logical/trace/trace_plan_tag_filter.go index af1b78d2..c4cfbb4b 100644 --- a/pkg/query/logical/trace/trace_plan_tag_filter.go +++ b/pkg/query/logical/trace/trace_plan_tag_filter.go @@ -40,6 +40,7 @@ type unresolvedTraceTagFilter struct { ec executor.TraceExecutionContext metadata *commonv1.Metadata criteria *modelv1.Criteria + traceIDTagName string projectionTags [][]*logical.Tag } @@ -52,8 +53,9 @@ func (uis *unresolvedTraceTagFilter) Analyze(s logical.Schema) (logical.Plan, er } var err error var conditionTagNames []string + var traceIDs []string // For trace, we only use skipping filter (no inverted filter or entities) - ctx.skippingFilter, conditionTagNames, err = buildTraceFilter(uis.criteria, s, entityDict) + ctx.skippingFilter, conditionTagNames, traceIDs, err = buildTraceFilter(uis.criteria, s, entityDict, uis.traceIDTagName) if err != nil { return nil, err } @@ -89,7 +91,7 @@ func (uis *unresolvedTraceTagFilter) Analyze(s logical.Schema) (logical.Plan, er return nil, errProject } } - plan := uis.selectTraceScanner(ctx, uis.ec) + plan := uis.selectTraceScanner(ctx, uis.ec, traceIDs) if uis.criteria != nil { tagFilter, errFilter := logical.BuildTagFilter(uis.criteria, entityDict, s, len(ctx.globalConditions) > 1) if errFilter != nil { @@ -103,7 +105,7 @@ func (uis *unresolvedTraceTagFilter) Analyze(s logical.Schema) (logical.Plan, er return plan, err } -func (uis *unresolvedTraceTagFilter) selectTraceScanner(ctx *traceAnalyzeContext, ec executor.TraceExecutionContext) logical.Plan { +func (uis *unresolvedTraceTagFilter) selectTraceScanner(ctx *traceAnalyzeContext, ec executor.TraceExecutionContext, traceIDs []string) logical.Plan { return &localScan{ timeRange: timestamp.NewInclusiveTimeRange(uis.startTime, uis.endTime), schema: ctx.s, @@ -113,6 +115,7 @@ func (uis *unresolvedTraceTagFilter) selectTraceScanner(ctx *traceAnalyzeContext skippingFilter: ctx.skippingFilter, l: logger.GetLogger("query", "trace", "local-scan"), ec: ec, + traceIDs: traceIDs, } } @@ -146,9 +149,9 @@ func deduplicateStrings(strings []string) []string { // buildTraceFilter builds a filter for trace queries and returns both the filter and collected tag names. // Unlike stream, trace only needs skipping filter. -func buildTraceFilter(criteria *modelv1.Criteria, s logical.Schema, entityDict map[string]int) (index.Filter, []string, error) { +func buildTraceFilter(criteria *modelv1.Criteria, s logical.Schema, entityDict map[string]int, traceIDTagName string) (index.Filter, []string, []string, error) { if criteria == nil { - return nil, nil, nil + return nil, nil, nil, nil } // Create a map of valid tag names from the schema tagNames := make(map[string]bool) @@ -157,8 +160,8 @@ func buildTraceFilter(criteria *modelv1.Criteria, s logical.Schema, entityDict m tagNames[tagName] = true } - filter, _, collectedTagNames, err := buildFilter(criteria, tagNames, entityDict, nil) - return filter, collectedTagNames, err + filter, _, collectedTagNames, traceIDs, err := buildFilter(criteria, tagNames, entityDict, nil, traceIDTagName) + return filter, collectedTagNames, traceIDs, err } var ( diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go index f3349d06..dda510ec 100644 --- a/pkg/query/model/model.go +++ b/pkg/query/model/model.go @@ -317,11 +317,12 @@ type StreamQueryResult interface { // TraceQueryOptions is the options of a trace query. type TraceQueryOptions struct { - TimeRange *timestamp.TimeRange SkippingFilter index.Filter + TimeRange *timestamp.TimeRange Order *index.OrderBy TagProjection *TagProjection Name string + TraceIDs []string MaxTraceSize int } @@ -332,6 +333,7 @@ func (t *TraceQueryOptions) Reset() { t.SkippingFilter = nil t.Order = nil t.TagProjection = nil + t.TraceIDs = nil t.MaxTraceSize = 0 } diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go index 638c3147..70ec09d1 100644 --- a/pkg/schema/cache.go +++ b/pkg/schema/cache.go @@ -181,9 +181,15 @@ func (sr *schemaRepo) Watcher() { err = sr.storeResource(evt.Metadata) case EventKindIndexRule: indexRule := evt.Metadata.(*databasev1.IndexRule) + if indexRule.GetMetadata().GetGroup() == "test-trace-group" { + sr.l.Info().Str("group", indexRule.GetMetadata().GetGroup()).Msg("index rule") + } sr.storeIndexRule(indexRule) case EventKindIndexRuleBinding: indexRuleBinding := evt.Metadata.(*databasev1.IndexRuleBinding) + if indexRuleBinding.GetMetadata().GetGroup() == "test-trace-group" { + sr.l.Info().Str("group", indexRuleBinding.GetMetadata().GetGroup()).Msg("index rule binding") + } sr.storeIndexRuleBinding(indexRuleBinding) } case EventDelete: diff --git a/test/cases/init.go b/test/cases/init.go index 9d152f99..ad2a71b6 100644 --- a/test/cases/init.go +++ b/test/cases/init.go @@ -28,6 +28,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/grpchelper" casesmeasuredata "github.com/apache/skywalking-banyandb/test/cases/measure/data" casesstreamdata "github.com/apache/skywalking-banyandb/test/cases/stream/data" + casestrace "github.com/apache/skywalking-banyandb/test/cases/trace/data" ) // Initialize test data. @@ -61,7 +62,6 @@ func Initialize(addr string, now time.Time) { casesmeasuredata.Write(conn, "service_cpm_minute", "sw_updated", "service_cpm_minute_updated_data.json", now.Add(10*time.Minute), interval) time.Sleep(5 * time.Second) // trace - // nolint:gocritic - // interval = 500 * time.Millisecond - // casestrace.Write(conn, "sw", now, interval) + interval = 500 * time.Millisecond + casestrace.Write(conn, "sw", now, interval) } diff --git a/test/cases/trace/data/input/all.yml b/test/cases/trace/data/input/all.yml index 8a295c06..301729e6 100644 --- a/test/cases/trace/data/input/all.yml +++ b/test/cases/trace/data/input/all.yml @@ -18,3 +18,8 @@ name: "sw" groups: ["test-trace-group"] tag_projection: ["trace_id"] +criteria: + condition: + name: "trace_id" + op: "eq" + value: "1" diff --git a/test/integration/standalone/query/query_suite_test.go b/test/integration/standalone/query/query_suite_test.go index f760710a..453ad6b3 100644 --- a/test/integration/standalone/query/query_suite_test.go +++ b/test/integration/standalone/query/query_suite_test.go @@ -39,6 +39,7 @@ import ( casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" casesstream "github.com/apache/skywalking-banyandb/test/cases/stream" casestopn "github.com/apache/skywalking-banyandb/test/cases/topn" + casestrace "github.com/apache/skywalking-banyandb/test/cases/trace" integration_standalone "github.com/apache/skywalking-banyandb/test/integration/standalone" ) @@ -82,10 +83,10 @@ var _ = SynchronizedBeforeSuite(func() []byte { Connection: connection, BaseTime: now, } - // casestrace.SharedContext = helpers.SharedContext{ - // Connection: connection, - // BaseTime: now, - // } + casestrace.SharedContext = helpers.SharedContext{ + Connection: connection, + BaseTime: now, + } Expect(err).NotTo(HaveOccurred()) })