This is an automated email from the ASF dual-hosted git repository. ButterBright pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 7b2a552a7249efc8b63c195365955d0961c823e1 Author: Gao Hongtao <[email protected]> AuthorDate: Tue Jun 2 12:57:52 2026 +0800 Fix trace identity-tag projection and distributed span tag alignment (#1147) --- CHANGES.md | 6 + banyand/trace/block.go | 3 + banyand/trace/metadata_test.go | 6 +- banyand/trace/query.go | 26 ++++- banyand/trace/query_test.go | 15 +++ pkg/query/logical/trace/trace_plan_distributed.go | 27 ++++- .../logical/trace/trace_plan_distributed_test.go | 130 +++++++++++++++++++++ pkg/query/logical/trace/trace_plan_tag_filter.go | 15 ++- 8 files changed, 217 insertions(+), 11 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 0f007b0aa..c0928ad31 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,6 +2,12 @@ Release Notes. +## 0.10.3 + +### Bug Fixes + +- Fix trace query identity-tag projection: when `trace_id`/`span_id` are explicitly projected, reconstruct them from span identity at response build time instead of requesting them as stored tags, and preserve tag order with null-filled per-span value alignment in the distributed trace result iterator. + ## 0.10.2 ### Bug Fixes diff --git a/banyand/trace/block.go b/banyand/trace/block.go index 0db54fdd1..c950e852a 100644 --- a/banyand/trace/block.go +++ b/banyand/trace/block.go @@ -457,6 +457,9 @@ func (bc *blockCursor) copyAllTo(r *model.TraceResult) { r.Spans = append(r.Spans, bc.spans...) r.SpanIDs = append(r.SpanIDs, bc.spanIDs...) + if bc.tagProjection == nil || len(bc.tagProjection.Names) == 0 { + return + } if len(r.Tags) != len(bc.tagProjection.Names) { r.Tags = make([]model.Tag, len(bc.tagProjection.Names)) for i, name := range bc.tagProjection.Names { diff --git a/banyand/trace/metadata_test.go b/banyand/trace/metadata_test.go index 44f18ac51..19b479731 100644 --- a/banyand/trace/metadata_test.go +++ b/banyand/trace/metadata_test.go @@ -384,9 +384,13 @@ var _ = Describe("Metadata", func() { return getFilePartCount(svcs, groupName) }, flags.EventuallyTimeout).Should(BeNumerically(">", filePartCountAfterFirstBatch)) partCountBeforeMerge := getTotalPartCount(svcs, groupName) + // The background merge of the freshly-flushed parts runs asynchronously and, + // on resource-constrained CI runners (notably under -race), can take well + // beyond the default Eventually budget. Allow a generous, environment-scaled + // timeout so the wait reflects merge latency rather than a fixed ceiling. Eventually(func() int64 { return getTotalPartCount(svcs, groupName) - }, flags.EventuallyTimeout).Should(BeNumerically("<", partCountBeforeMerge)) + }, 3*flags.EventuallyTimeout).Should(BeNumerically("<", partCountBeforeMerge)) Eventually(func(innerGm Gomega) { spans := querySchemaChangeTraceData(svcs, traceName, groupName, diff --git a/banyand/trace/query.go b/banyand/trace/query.go index 7845d0eb7..5d6906c08 100644 --- a/banyand/trace/query.go +++ b/banyand/trace/query.go @@ -82,10 +82,13 @@ func (t *trace) Query(ctx context.Context, tqo model.TraceQueryOptions) (model.T } }() + storageTagProjection := omitIdentityTagProjection(tqo.TagProjection, t.schema.GetTraceIdTagName(), t.schema.GetSpanIdTagName()) + storageTQO := tqo + storageTQO.TagProjection = storageTagProjection result := queryResult{ ctx: ctx, segments: segments, - tagProjection: tqo.TagProjection, + tagProjection: storageTagProjection, } segmentsNeedRelease = false defer func() { @@ -105,7 +108,7 @@ func (t *trace) Query(ctx context.Context, tqo model.TraceQueryOptions) (model.T } qo := queryOptions{ - TraceQueryOptions: tqo, + TraceQueryOptions: storageTQO, traceIDs: tqo.TraceIDs, schemaTagTypes: schemaTagTypes, } @@ -116,7 +119,7 @@ func (t *trace) Query(ctx context.Context, tqo model.TraceQueryOptions) (model.T tables := collectTables(segments) - sidxInstances, sidxQueryRequest, useSIDXStreaming := t.prepareSIDXStreaming(tqo, qo, tables) + sidxInstances, sidxQueryRequest, useSIDXStreaming := t.prepareSIDXStreaming(storageTQO, qo, tables) if len(qo.traceIDs) == 0 && !useSIDXStreaming { result.Release() return nilResult, nil @@ -149,6 +152,23 @@ func (t *trace) Query(ctx context.Context, tqo model.TraceQueryOptions) (model.T return &result, nil } +func omitIdentityTagProjection(projection *model.TagProjection, traceIDTagName, spanIDTagName string) *model.TagProjection { + if projection == nil { + return nil + } + filtered := &model.TagProjection{ + Family: projection.Family, + Names: make([]string, 0, len(projection.Names)), + } + for _, name := range projection.Names { + if name == traceIDTagName || name == spanIDTagName { + continue + } + filtered.Names = append(filtered.Names, name) + } + return filtered +} + func validateTraceQueryOptions(tqo model.TraceQueryOptions) error { if tqo.TimeRange == nil { return errors.New("invalid query options: timeRange are required") diff --git a/banyand/trace/query_test.go b/banyand/trace/query_test.go index 54abde693..5dab69d51 100644 --- a/banyand/trace/query_test.go +++ b/banyand/trace/query_test.go @@ -225,6 +225,21 @@ func TestQueryResult(t *testing.T) { } } +func TestOmitIdentityTagProjection(t *testing.T) { + projection := &model.TagProjection{ + Family: "default", + Names: []string{"trace_id", "service_id", "span_id", "duration"}, + } + + got := omitIdentityTagProjection(projection, "trace_id", "span_id") + + require.Equal(t, &model.TagProjection{ + Family: "default", + Names: []string{"service_id", "duration"}, + }, got) + require.Equal(t, []string{"trace_id", "service_id", "span_id", "duration"}, projection.Names) +} + func TestQueryResultMultipleBatches(t *testing.T) { tests := []struct { name string diff --git a/pkg/query/logical/trace/trace_plan_distributed.go b/pkg/query/logical/trace/trace_plan_distributed.go index bedc26ae0..5b5116d39 100644 --- a/pkg/query/logical/trace/trace_plan_distributed.go +++ b/pkg/query/logical/trace/trace_plan_distributed.go @@ -33,6 +33,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/iter" "github.com/apache/skywalking-banyandb/pkg/iter/sort" "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query" "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/query/logical" @@ -377,21 +378,37 @@ func (t *distributedTraceResultIterator) Next() (model.TraceResult, bool) { SpanIDs: make([]string, 0, len(trace.Spans)), Tags: make([]model.Tag, 0, len(trace.Spans)), } - tagMap := make(map[string][]*modelv1.TagValue) + tagValuesByName := make(map[string][]*modelv1.TagValue) + var tagOrder []string - for _, span := range trace.Spans { + for spanIdx, span := range trace.Spans { // Add span data result.Spans = append(result.Spans, span.Span) // Extract tags from this span and aggregate by name + spanTags := make(map[string]*modelv1.TagValue, len(span.Tags)) for _, tag := range span.Tags { - tagMap[tag.Key] = append(tagMap[tag.Key], tag.Value) + if _, exists := tagValuesByName[tag.Key]; !exists { + tagOrder = append(tagOrder, tag.Key) + tagValuesByName[tag.Key] = make([]*modelv1.TagValue, spanIdx) + for fillIdx := range tagValuesByName[tag.Key] { + tagValuesByName[tag.Key][fillIdx] = pbv1.NullTagValue + } + } + spanTags[tag.Key] = tag.Value + } + for _, tagName := range tagOrder { + tagValue, exists := spanTags[tagName] + if !exists { + tagValue = pbv1.NullTagValue + } + tagValuesByName[tagName] = append(tagValuesByName[tagName], tagValue) } result.SpanIDs = append(result.SpanIDs, span.SpanId) } - for tagName, values := range tagMap { + for _, tagName := range tagOrder { result.Tags = append(result.Tags, model.Tag{ Name: tagName, - Values: values, + Values: tagValuesByName[tagName], }) } return result, true diff --git a/pkg/query/logical/trace/trace_plan_distributed_test.go b/pkg/query/logical/trace/trace_plan_distributed_test.go new file mode 100644 index 000000000..9464627d2 --- /dev/null +++ b/pkg/query/logical/trace/trace_plan_distributed_test.go @@ -0,0 +1,130 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package trace + +import ( + "testing" + + "github.com/stretchr/testify/require" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + tracev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" +) + +func TestDistributedTraceResultIteratorPreservesTagOrderAndSpanAlignment(t *testing.T) { + iterator := &distributedTraceResultIterator{ + traces: []*tracev1.InternalTrace{ + { + TraceId: "trace-1", + Spans: []*tracev1.Span{ + { + Span: []byte("span-1"), + SpanId: "span-1", + Tags: []*modelv1.Tag{ + {Key: "service_id", Value: strTagValueForDistributedTest("svc")}, + {Key: "trace_id", Value: pbv1.NullTagValue}, + {Key: "trace_id", Value: strTagValueForDistributedTest("trace-1")}, + {Key: "span_id", Value: strTagValueForDistributedTest("span-1")}, + }, + }, + { + Span: []byte("span-2"), + SpanId: "span-2", + Tags: []*modelv1.Tag{ + {Key: "service_id", Value: strTagValueForDistributedTest("svc")}, + {Key: "trace_id", Value: strTagValueForDistributedTest("trace-1")}, + {Key: "span_id", Value: strTagValueForDistributedTest("span-2")}, + }, + }, + }, + }, + }, + } + + got, ok := iterator.Next() + + require.True(t, ok) + require.Len(t, got.Tags, 3) + require.Equal(t, "service_id", got.Tags[0].Name) + require.Equal(t, "trace_id", got.Tags[1].Name) + require.Equal(t, "span_id", got.Tags[2].Name) + require.Equal(t, "trace-1", got.Tags[1].Values[0].GetStr().GetValue()) + require.Equal(t, "trace-1", got.Tags[1].Values[1].GetStr().GetValue()) + require.Equal(t, "span-1", got.Tags[2].Values[0].GetStr().GetValue()) + require.Equal(t, "span-2", got.Tags[2].Values[1].GetStr().GetValue()) +} + +func TestDistributedTraceResultIteratorNullFillsMissingSpanTags(t *testing.T) { + iterator := &distributedTraceResultIterator{ + traces: []*tracev1.InternalTrace{ + { + TraceId: "trace-1", + Spans: []*tracev1.Span{ + { + Span: []byte("span-1"), + SpanId: "span-1", + Tags: []*modelv1.Tag{ + {Key: "service_id", Value: strTagValueForDistributedTest("svc")}, + {Key: "only_on_first", Value: strTagValueForDistributedTest("first")}, + }, + }, + { + Span: []byte("span-2"), + SpanId: "span-2", + Tags: []*modelv1.Tag{ + {Key: "service_id", Value: strTagValueForDistributedTest("svc")}, + {Key: "only_on_second", Value: strTagValueForDistributedTest("second")}, + }, + }, + }, + }, + }, + } + + got, ok := iterator.Next() + + require.True(t, ok) + spanCount := len(got.Spans) + require.Equal(t, 2, spanCount) + + values := make(map[string][]*modelv1.TagValue, len(got.Tags)) + for _, tag := range got.Tags { + require.Lenf(t, tag.Values, spanCount, "tag %q must have one value per span", tag.Name) + values[tag.Name] = tag.Values + } + + require.Equal(t, "svc", values["service_id"][0].GetStr().GetValue()) + require.Equal(t, "svc", values["service_id"][1].GetStr().GetValue()) + + // only_on_first is present on span-1 and forward-filled with NULL on span-2. + require.Equal(t, "first", values["only_on_first"][0].GetStr().GetValue()) + require.Same(t, pbv1.NullTagValue, values["only_on_first"][1]) + + // only_on_second is back-filled with NULL on span-1 and present on span-2. + require.Same(t, pbv1.NullTagValue, values["only_on_second"][0]) + require.Equal(t, "second", values["only_on_second"][1].GetStr().GetValue()) +} + +func strTagValueForDistributedTest(value string) *modelv1.TagValue { + return &modelv1.TagValue{ + Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: value}, + }, + } +} diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go b/pkg/query/logical/trace/trace_plan_tag_filter.go index d407a7607..25b06f3fb 100644 --- a/pkg/query/logical/trace/trace_plan_tag_filter.go +++ b/pkg/query/logical/trace/trace_plan_tag_filter.go @@ -117,8 +117,8 @@ func (uis *unresolvedTraceTagFilter) Analyze(s logical.Schema) (logical.Plan, er conditionSchema = s.ProjTags(conditionTagRefs...) } - // Deduplicate tag names - ctx.projectionTags.Names = deduplicateStrings(ctx.projectionTags.Names) + // Deduplicate tag names and leave identity tags to response construction. + ctx.projectionTags.Names = omitIdentityProjectionTagNames(deduplicateStrings(ctx.projectionTags.Names), uis.traceIDTagName, uis.spanIDTagName) // Create tag references if we have any projection tags if len(ctx.projectionTags.Names) > 0 { @@ -212,6 +212,17 @@ func deduplicateStrings(strings []string) []string { return result } +func omitIdentityProjectionTagNames(tagNames []string, traceIDTagName, spanIDTagName string) []string { + result := make([]string, 0, len(tagNames)) + for _, tagName := range tagNames { + if tagName == traceIDTagName || tagName == spanIDTagName { + continue + } + result = append(result, tagName) + } + return result +} + // buildTraceFilter builds a filter for trace queries and returns both the filter and collected tag names. // Unlike stream, trace only needs skipping filter. // Returns min/max int64 values for the orderByTag if provided, otherwise returns math.MaxInt64, math.MinInt64.
