This is an automated email from the ASF dual-hosted git repository.

ButterBright pushed a commit to branch v0.10.x
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 7b2a552a7249efc8b63c195365955d0961c823e1
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Jun 2 12:57:52 2026 +0800

    Fix trace identity-tag projection and distributed span tag alignment (#1147)
---
 CHANGES.md                                         |   6 +
 banyand/trace/block.go                             |   3 +
 banyand/trace/metadata_test.go                     |   6 +-
 banyand/trace/query.go                             |  26 ++++-
 banyand/trace/query_test.go                        |  15 +++
 pkg/query/logical/trace/trace_plan_distributed.go  |  27 ++++-
 .../logical/trace/trace_plan_distributed_test.go   | 130 +++++++++++++++++++++
 pkg/query/logical/trace/trace_plan_tag_filter.go   |  15 ++-
 8 files changed, 217 insertions(+), 11 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 0f007b0aa..c0928ad31 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -2,6 +2,12 @@
 
 Release Notes.
 
+## 0.10.3
+
+### Bug Fixes
+
+- Fix trace query identity-tag projection: when `trace_id`/`span_id` are 
explicitly projected, reconstruct them from span identity at response build 
time instead of requesting them as stored tags, and preserve tag order with 
null-filled per-span value alignment in the distributed trace result iterator.
+
 ## 0.10.2
 
 ### Bug Fixes
diff --git a/banyand/trace/block.go b/banyand/trace/block.go
index 0db54fdd1..c950e852a 100644
--- a/banyand/trace/block.go
+++ b/banyand/trace/block.go
@@ -457,6 +457,9 @@ func (bc *blockCursor) copyAllTo(r *model.TraceResult) {
        r.Spans = append(r.Spans, bc.spans...)
        r.SpanIDs = append(r.SpanIDs, bc.spanIDs...)
 
+       if bc.tagProjection == nil || len(bc.tagProjection.Names) == 0 {
+               return
+       }
        if len(r.Tags) != len(bc.tagProjection.Names) {
                r.Tags = make([]model.Tag, len(bc.tagProjection.Names))
                for i, name := range bc.tagProjection.Names {
diff --git a/banyand/trace/metadata_test.go b/banyand/trace/metadata_test.go
index 44f18ac51..19b479731 100644
--- a/banyand/trace/metadata_test.go
+++ b/banyand/trace/metadata_test.go
@@ -384,9 +384,13 @@ var _ = Describe("Metadata", func() {
                                        return getFilePartCount(svcs, groupName)
                                }, 
flags.EventuallyTimeout).Should(BeNumerically(">", 
filePartCountAfterFirstBatch))
                                partCountBeforeMerge := getTotalPartCount(svcs, 
groupName)
+                               // The background merge of the freshly-flushed 
parts runs asynchronously and,
+                               // on resource-constrained CI runners (notably 
under -race), can take well
+                               // beyond the default Eventually budget. Allow 
a generous, environment-scaled
+                               // timeout so the wait reflects merge latency 
rather than a fixed ceiling.
                                Eventually(func() int64 {
                                        return getTotalPartCount(svcs, 
groupName)
-                               }, 
flags.EventuallyTimeout).Should(BeNumerically("<", partCountBeforeMerge))
+                               }, 
3*flags.EventuallyTimeout).Should(BeNumerically("<", partCountBeforeMerge))
 
                                Eventually(func(innerGm Gomega) {
                                        spans := 
querySchemaChangeTraceData(svcs, traceName, groupName,
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index 7845d0eb7..5d6906c08 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -82,10 +82,13 @@ func (t *trace) Query(ctx context.Context, tqo 
model.TraceQueryOptions) (model.T
                }
        }()
 
+       storageTagProjection := omitIdentityTagProjection(tqo.TagProjection, 
t.schema.GetTraceIdTagName(), t.schema.GetSpanIdTagName())
+       storageTQO := tqo
+       storageTQO.TagProjection = storageTagProjection
        result := queryResult{
                ctx:           ctx,
                segments:      segments,
-               tagProjection: tqo.TagProjection,
+               tagProjection: storageTagProjection,
        }
        segmentsNeedRelease = false
        defer func() {
@@ -105,7 +108,7 @@ func (t *trace) Query(ctx context.Context, tqo 
model.TraceQueryOptions) (model.T
        }
 
        qo := queryOptions{
-               TraceQueryOptions: tqo,
+               TraceQueryOptions: storageTQO,
                traceIDs:          tqo.TraceIDs,
                schemaTagTypes:    schemaTagTypes,
        }
@@ -116,7 +119,7 @@ func (t *trace) Query(ctx context.Context, tqo 
model.TraceQueryOptions) (model.T
 
        tables := collectTables(segments)
 
-       sidxInstances, sidxQueryRequest, useSIDXStreaming := 
t.prepareSIDXStreaming(tqo, qo, tables)
+       sidxInstances, sidxQueryRequest, useSIDXStreaming := 
t.prepareSIDXStreaming(storageTQO, qo, tables)
        if len(qo.traceIDs) == 0 && !useSIDXStreaming {
                result.Release()
                return nilResult, nil
@@ -149,6 +152,23 @@ func (t *trace) Query(ctx context.Context, tqo 
model.TraceQueryOptions) (model.T
        return &result, nil
 }
 
+func omitIdentityTagProjection(projection *model.TagProjection, 
traceIDTagName, spanIDTagName string) *model.TagProjection {
+       if projection == nil {
+               return nil
+       }
+       filtered := &model.TagProjection{
+               Family: projection.Family,
+               Names:  make([]string, 0, len(projection.Names)),
+       }
+       for _, name := range projection.Names {
+               if name == traceIDTagName || name == spanIDTagName {
+                       continue
+               }
+               filtered.Names = append(filtered.Names, name)
+       }
+       return filtered
+}
+
 func validateTraceQueryOptions(tqo model.TraceQueryOptions) error {
        if tqo.TimeRange == nil {
                return errors.New("invalid query options: timeRange are 
required")
diff --git a/banyand/trace/query_test.go b/banyand/trace/query_test.go
index 54abde693..5dab69d51 100644
--- a/banyand/trace/query_test.go
+++ b/banyand/trace/query_test.go
@@ -225,6 +225,21 @@ func TestQueryResult(t *testing.T) {
        }
 }
 
+func TestOmitIdentityTagProjection(t *testing.T) {
+       projection := &model.TagProjection{
+               Family: "default",
+               Names:  []string{"trace_id", "service_id", "span_id", 
"duration"},
+       }
+
+       got := omitIdentityTagProjection(projection, "trace_id", "span_id")
+
+       require.Equal(t, &model.TagProjection{
+               Family: "default",
+               Names:  []string{"service_id", "duration"},
+       }, got)
+       require.Equal(t, []string{"trace_id", "service_id", "span_id", 
"duration"}, projection.Names)
+}
+
 func TestQueryResultMultipleBatches(t *testing.T) {
        tests := []struct {
                name         string
diff --git a/pkg/query/logical/trace/trace_plan_distributed.go 
b/pkg/query/logical/trace/trace_plan_distributed.go
index bedc26ae0..5b5116d39 100644
--- a/pkg/query/logical/trace/trace_plan_distributed.go
+++ b/pkg/query/logical/trace/trace_plan_distributed.go
@@ -33,6 +33,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/iter"
        "github.com/apache/skywalking-banyandb/pkg/iter/sort"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
@@ -377,21 +378,37 @@ func (t *distributedTraceResultIterator) Next() 
(model.TraceResult, bool) {
                SpanIDs: make([]string, 0, len(trace.Spans)),
                Tags:    make([]model.Tag, 0, len(trace.Spans)),
        }
-       tagMap := make(map[string][]*modelv1.TagValue)
+       tagValuesByName := make(map[string][]*modelv1.TagValue)
+       var tagOrder []string
 
-       for _, span := range trace.Spans {
+       for spanIdx, span := range trace.Spans {
                // Add span data
                result.Spans = append(result.Spans, span.Span)
                // Extract tags from this span and aggregate by name
+               spanTags := make(map[string]*modelv1.TagValue, len(span.Tags))
                for _, tag := range span.Tags {
-                       tagMap[tag.Key] = append(tagMap[tag.Key], tag.Value)
+                       if _, exists := tagValuesByName[tag.Key]; !exists {
+                               tagOrder = append(tagOrder, tag.Key)
+                               tagValuesByName[tag.Key] = 
make([]*modelv1.TagValue, spanIdx)
+                               for fillIdx := range tagValuesByName[tag.Key] {
+                                       tagValuesByName[tag.Key][fillIdx] = 
pbv1.NullTagValue
+                               }
+                       }
+                       spanTags[tag.Key] = tag.Value
+               }
+               for _, tagName := range tagOrder {
+                       tagValue, exists := spanTags[tagName]
+                       if !exists {
+                               tagValue = pbv1.NullTagValue
+                       }
+                       tagValuesByName[tagName] = 
append(tagValuesByName[tagName], tagValue)
                }
                result.SpanIDs = append(result.SpanIDs, span.SpanId)
        }
-       for tagName, values := range tagMap {
+       for _, tagName := range tagOrder {
                result.Tags = append(result.Tags, model.Tag{
                        Name:   tagName,
-                       Values: values,
+                       Values: tagValuesByName[tagName],
                })
        }
        return result, true
diff --git a/pkg/query/logical/trace/trace_plan_distributed_test.go 
b/pkg/query/logical/trace/trace_plan_distributed_test.go
new file mode 100644
index 000000000..9464627d2
--- /dev/null
+++ b/pkg/query/logical/trace/trace_plan_distributed_test.go
@@ -0,0 +1,130 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/require"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+func TestDistributedTraceResultIteratorPreservesTagOrderAndSpanAlignment(t 
*testing.T) {
+       iterator := &distributedTraceResultIterator{
+               traces: []*tracev1.InternalTrace{
+                       {
+                               TraceId: "trace-1",
+                               Spans: []*tracev1.Span{
+                                       {
+                                               Span:   []byte("span-1"),
+                                               SpanId: "span-1",
+                                               Tags: []*modelv1.Tag{
+                                                       {Key: "service_id", 
Value: strTagValueForDistributedTest("svc")},
+                                                       {Key: "trace_id", 
Value: pbv1.NullTagValue},
+                                                       {Key: "trace_id", 
Value: strTagValueForDistributedTest("trace-1")},
+                                                       {Key: "span_id", Value: 
strTagValueForDistributedTest("span-1")},
+                                               },
+                                       },
+                                       {
+                                               Span:   []byte("span-2"),
+                                               SpanId: "span-2",
+                                               Tags: []*modelv1.Tag{
+                                                       {Key: "service_id", 
Value: strTagValueForDistributedTest("svc")},
+                                                       {Key: "trace_id", 
Value: strTagValueForDistributedTest("trace-1")},
+                                                       {Key: "span_id", Value: 
strTagValueForDistributedTest("span-2")},
+                                               },
+                                       },
+                               },
+                       },
+               },
+       }
+
+       got, ok := iterator.Next()
+
+       require.True(t, ok)
+       require.Len(t, got.Tags, 3)
+       require.Equal(t, "service_id", got.Tags[0].Name)
+       require.Equal(t, "trace_id", got.Tags[1].Name)
+       require.Equal(t, "span_id", got.Tags[2].Name)
+       require.Equal(t, "trace-1", got.Tags[1].Values[0].GetStr().GetValue())
+       require.Equal(t, "trace-1", got.Tags[1].Values[1].GetStr().GetValue())
+       require.Equal(t, "span-1", got.Tags[2].Values[0].GetStr().GetValue())
+       require.Equal(t, "span-2", got.Tags[2].Values[1].GetStr().GetValue())
+}
+
+func TestDistributedTraceResultIteratorNullFillsMissingSpanTags(t *testing.T) {
+       iterator := &distributedTraceResultIterator{
+               traces: []*tracev1.InternalTrace{
+                       {
+                               TraceId: "trace-1",
+                               Spans: []*tracev1.Span{
+                                       {
+                                               Span:   []byte("span-1"),
+                                               SpanId: "span-1",
+                                               Tags: []*modelv1.Tag{
+                                                       {Key: "service_id", 
Value: strTagValueForDistributedTest("svc")},
+                                                       {Key: "only_on_first", 
Value: strTagValueForDistributedTest("first")},
+                                               },
+                                       },
+                                       {
+                                               Span:   []byte("span-2"),
+                                               SpanId: "span-2",
+                                               Tags: []*modelv1.Tag{
+                                                       {Key: "service_id", 
Value: strTagValueForDistributedTest("svc")},
+                                                       {Key: "only_on_second", 
Value: strTagValueForDistributedTest("second")},
+                                               },
+                                       },
+                               },
+                       },
+               },
+       }
+
+       got, ok := iterator.Next()
+
+       require.True(t, ok)
+       spanCount := len(got.Spans)
+       require.Equal(t, 2, spanCount)
+
+       values := make(map[string][]*modelv1.TagValue, len(got.Tags))
+       for _, tag := range got.Tags {
+               require.Lenf(t, tag.Values, spanCount, "tag %q must have one 
value per span", tag.Name)
+               values[tag.Name] = tag.Values
+       }
+
+       require.Equal(t, "svc", values["service_id"][0].GetStr().GetValue())
+       require.Equal(t, "svc", values["service_id"][1].GetStr().GetValue())
+
+       // only_on_first is present on span-1 and forward-filled with NULL on 
span-2.
+       require.Equal(t, "first", 
values["only_on_first"][0].GetStr().GetValue())
+       require.Same(t, pbv1.NullTagValue, values["only_on_first"][1])
+
+       // only_on_second is back-filled with NULL on span-1 and present on 
span-2.
+       require.Same(t, pbv1.NullTagValue, values["only_on_second"][0])
+       require.Equal(t, "second", 
values["only_on_second"][1].GetStr().GetValue())
+}
+
+func strTagValueForDistributedTest(value string) *modelv1.TagValue {
+       return &modelv1.TagValue{
+               Value: &modelv1.TagValue_Str{
+                       Str: &modelv1.Str{Value: value},
+               },
+       }
+}
diff --git a/pkg/query/logical/trace/trace_plan_tag_filter.go 
b/pkg/query/logical/trace/trace_plan_tag_filter.go
index d407a7607..25b06f3fb 100644
--- a/pkg/query/logical/trace/trace_plan_tag_filter.go
+++ b/pkg/query/logical/trace/trace_plan_tag_filter.go
@@ -117,8 +117,8 @@ func (uis *unresolvedTraceTagFilter) Analyze(s 
logical.Schema) (logical.Plan, er
                conditionSchema = s.ProjTags(conditionTagRefs...)
        }
 
-       // Deduplicate tag names
-       ctx.projectionTags.Names = deduplicateStrings(ctx.projectionTags.Names)
+       // Deduplicate tag names and leave identity tags to response 
construction.
+       ctx.projectionTags.Names = 
omitIdentityProjectionTagNames(deduplicateStrings(ctx.projectionTags.Names), 
uis.traceIDTagName, uis.spanIDTagName)
 
        // Create tag references if we have any projection tags
        if len(ctx.projectionTags.Names) > 0 {
@@ -212,6 +212,17 @@ func deduplicateStrings(strings []string) []string {
        return result
 }
 
+func omitIdentityProjectionTagNames(tagNames []string, traceIDTagName, 
spanIDTagName string) []string {
+       result := make([]string, 0, len(tagNames))
+       for _, tagName := range tagNames {
+               if tagName == traceIDTagName || tagName == spanIDTagName {
+                       continue
+               }
+               result = append(result, tagName)
+       }
+       return result
+}
+
 // buildTraceFilter builds a filter for trace queries and returns both the 
filter and collected tag names.
 // Unlike stream, trace only needs skipping filter.
 // Returns min/max int64 values for the orderByTag if provided, otherwise 
returns math.MaxInt64, math.MinInt64.

Reply via email to