This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 02d0c194018a71527eee37d2f8df887c6c8815b2 Author: Hongtao Gao <[email protected]> AuthorDate: Fri May 15 10:06:04 2026 +0000 feat(query/vectorized/measure/plan): G9d hidden criteria tags egress-strip wrapper --- pkg/query/vectorized/measure/plan/dispatch.go | 32 +++++--- pkg/query/vectorized/measure/plan/dispatch_test.go | 95 ++++++++++++++++++++++ pkg/query/vectorized/measure/plan/hidden_tags.go | 94 +++++++++++++++++++++ .../standalone/query/vectorized_test.go | 10 ++- 4 files changed, 216 insertions(+), 15 deletions(-) diff --git a/pkg/query/vectorized/measure/plan/dispatch.go b/pkg/query/vectorized/measure/plan/dispatch.go index baa870a2b..8a2a7b573 100644 --- a/pkg/query/vectorized/measure/plan/dispatch.go +++ b/pkg/query/vectorized/measure/plan/dispatch.go @@ -93,8 +93,9 @@ func FellThroughCount() int64 { return fellThroughCount.Load() } // - request must NOT carry Top (BatchTop's single-heap semantic differs // from the row path's per-timestamp top-N) // - request must carry TimeRange (storage requires a bounded window) -// - request must NOT have hidden criteria tags (those need an egress -// strip wrapper that v1 dispatch does not implement) +// - hidden criteria tags (criteria tags absent from the projection) +// are projected for storage-side filtering, then stripped at egress +// by hiddenTagsMIterator so the wire format is byte-identical // - measureSchema and logicalSchema must be non-nil func Dispatch( ctx context.Context, @@ -176,9 +177,11 @@ func Dispatch( } // Hidden-tag detection: criteria may reference tags that are NOT in - // the projection (they're needed only as filter inputs). The row - // path strips them at egress via hiddenTagsMIterator. v1 dispatch - // does not implement that strip yet, so fall through when present. + // the projection (they're needed only as filter inputs). Such tags + // are projected so storage can evaluate the criteria, then stripped + // at egress by hiddenTagsMIterator so the wire bytes stay identical + // to a query without hidden criteria tags (the row path does the + // same in unresolvedIndexScan.Analyze + resultMIterator). projectedTagNames := projectedNames(req.GetTagProjection()) entityList := logicalSchema.EntityList() entityMap := make(map[string]int, len(entityList)) @@ -191,13 +194,14 @@ func Dispatch( for _, tf := range measureSchema.GetTagFamilies() { familyNames = append(familyNames, tf.GetName()) } - hidden, _ := logical.CollectHiddenCriteriaTags( + hidden, hiddenExtras := logical.CollectHiddenCriteriaTags( req.GetCriteria(), projectedTagNames, entityMap, logicalSchema, func() []string { return familyNames }, ) - if !hidden.IsEmpty() { - return nil, "", false, nil - } + // analyzeReq carries the hidden tags in its TagProjection so the Scan + // materializes them for storage-side filtering; req itself is left + // unchanged for the index.Query / opts wiring below. + analyzeReq := augmentRequestWithHiddenTags(req, hiddenExtras) indexOrder, orderErr := resolveOrderBy(req.GetOrderBy(), logicalSchema) if orderErr != nil { @@ -218,8 +222,9 @@ func Dispatch( return nil, "", true, fmt.Errorf("vec dispatch: build query: %w", qErr) } - // Build the structural plan tree. - p, analyzeErr := Analyze(req, measureSchema) + // Build the structural plan tree from analyzeReq so the Scan's + // BatchSchema + opts.TagProjection carry the hidden criteria tags. + p, analyzeErr := Analyze(analyzeReq, measureSchema) if analyzeErr != nil { return nil, "", true, fmt.Errorf("vec dispatch: analyze: %w", analyzeErr) } @@ -285,6 +290,11 @@ func Dispatch( // here. return nil, "", true, fmt.Errorf("vec dispatch: execute: %w", execErr) } + if !hidden.IsEmpty() { + // Strip the projected-for-filtering hidden tags before + // serialization so the wire bytes match a query without them. + iter = &hiddenTagsMIterator{inner: iter, hiddenTags: hidden} + } return iter, p.String(), true, nil } diff --git a/pkg/query/vectorized/measure/plan/dispatch_test.go b/pkg/query/vectorized/measure/plan/dispatch_test.go index 08f9dcb6c..a809995b2 100644 --- a/pkg/query/vectorized/measure/plan/dispatch_test.go +++ b/pkg/query/vectorized/measure/plan/dispatch_test.go @@ -27,6 +27,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/query/logical" logicalmeasure "github.com/apache/skywalking-banyandb/pkg/query/logical/measure" "github.com/apache/skywalking-banyandb/pkg/query/model" measure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" @@ -535,6 +536,100 @@ func TestDispatch_GroupByAggCovered_ReachesEcQuery(t *testing.T) { } } +// TestAugmentRequestWithHiddenTags_AppendsFamiliesAfterVisible covers the +// G9d projection-extension mechanism: hidden criteria tags (grouped by +// family) are appended AFTER the visible projection so the visible tags +// keep their projected order, the caller's request is never mutated, and +// an empty extras slice returns the request unchanged (no clone). +func TestAugmentRequestWithHiddenTags_AppendsFamiliesAfterVisible(t *testing.T) { + req := bareReq() // TagProjection: default=[svc] + extras := [][]*logical.Tag{ + {logical.NewTag("default", "region")}, + {logical.NewTag("extra", "zone")}, + } + got := augmentRequestWithHiddenTags(req, extras) + if got == req { + t.Fatal("augment must return a clone when extras are present") + } + if len(req.GetTagProjection().GetTagFamilies()) != 1 || + req.GetTagProjection().GetTagFamilies()[0].GetTags()[0] != "svc" { + t.Fatalf("caller's req.TagProjection must be untouched, got %+v", req.GetTagProjection()) + } + fams := got.GetTagProjection().GetTagFamilies() + if len(fams) != 3 { + t.Fatalf("want 3 families (1 visible + 2 hidden), got %d: %+v", len(fams), fams) + } + if fams[0].GetName() != "default" || fams[0].GetTags()[0] != "svc" { + t.Fatalf("visible family must stay first, got %+v", fams[0]) + } + if fams[1].GetName() != "default" || fams[1].GetTags()[0] != "region" { + t.Fatalf("first hidden family wrong, got %+v", fams[1]) + } + if fams[2].GetName() != "extra" || fams[2].GetTags()[0] != "zone" { + t.Fatalf("second hidden family wrong, got %+v", fams[2]) + } + if same := augmentRequestWithHiddenTags(req, nil); same != req { + t.Fatal("augment with no extras must return the original request") + } +} + +// TestHiddenTagsMIterator_StripsHiddenTagsFromCurrent proves the egress +// wrapper removes exactly the hidden tags from each Current() DataPoint, +// leaving visible tags untouched — the same contract as the row path's +// hiddenTagsMIterator, which is what keeps the wire bytes identical. +func TestHiddenTagsMIterator_StripsHiddenTagsFromCurrent(t *testing.T) { + build := func() []*measurev1.InternalDataPoint { + return []*measurev1.InternalDataPoint{{ + DataPoint: &measurev1.DataPoint{ + TagFamilies: []*modelv1.TagFamily{{ + Name: "default", + Tags: []*modelv1.Tag{ + {Key: "svc", Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "a"}}}}, + {Key: "region", Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "us"}}}}, + }, + }}, + }, + }} + } + hidden := logical.NewHiddenTagSet() + hidden.Add("region") + it := &hiddenTagsMIterator{inner: &stubMIterator{rows: build()}, hiddenTags: hidden} + if !it.Next() { + t.Fatal("Next must advance once") + } + cur := it.Current() + if len(cur) != 1 || len(cur[0].DataPoint.TagFamilies) != 1 { + t.Fatalf("expected one family after strip, got %+v", cur) + } + tags := cur[0].DataPoint.TagFamilies[0].Tags + if len(tags) != 1 || tags[0].Key != "svc" { + t.Fatalf("only visible tag 'svc' must remain, got %+v", tags) + } + if it.Close() != nil { + t.Fatal("Close must propagate inner Close (nil)") + } +} + +// stubMIterator is a one-shot executor.MIterator over a fixed row slice. +type stubMIterator struct { + rows []*measurev1.InternalDataPoint + pos int +} + +func (s *stubMIterator) Next() bool { + s.pos++ + return s.pos <= len(s.rows) +} + +func (s *stubMIterator) Current() []*measurev1.InternalDataPoint { + if s.pos < 1 || s.pos > len(s.rows) { + return nil + } + return s.rows[s.pos-1 : s.pos] +} + +func (s *stubMIterator) Close() error { return nil } + // TestDispatch_QueryError_BubblesUp covers the error propagation when // the storage query itself fails. Dispatch must report (nil, "", true, // err) so the caller surfaces the error rather than re-trying the row diff --git a/pkg/query/vectorized/measure/plan/hidden_tags.go b/pkg/query/vectorized/measure/plan/hidden_tags.go new file mode 100644 index 000000000..bb0726a06 --- /dev/null +++ b/pkg/query/vectorized/measure/plan/hidden_tags.go @@ -0,0 +1,94 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package plan + +import ( + "google.golang.org/protobuf/proto" + + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/query/executor" + "github.com/apache/skywalking-banyandb/pkg/query/logical" +) + +// augmentRequestWithHiddenTags returns a request whose TagProjection also +// carries the hidden criteria tags (grouped by family) so the vec Scan +// materializes them as columns. Storage needs the hidden tag values to +// evaluate criteria conditions on non-indexed tags; the row path performs +// the same projection extension in unresolvedIndexScan.Analyze before +// stripping the hidden tags at egress. +// +// The original request is left untouched: a shallow proto clone is taken +// and only its TagProjection is replaced. Hidden families are appended +// AFTER the visible projection families so the visible tags retain their +// projected order. BatchSchema coalesces same-named families into one +// TagFamilyGroup, so a hidden tag whose family is already projected lands +// in that family's group after the visible columns — and is removed again +// by the egress strip, leaving the wire bytes identical to a query with +// no hidden tags. +func augmentRequestWithHiddenTags(req *measurev1.QueryRequest, extras [][]*logical.Tag) *measurev1.QueryRequest { + if len(extras) == 0 { + return req + } + families := make([]*modelv1.TagProjection_TagFamily, 0) + if tp := req.GetTagProjection(); tp != nil { + families = append(families, tp.GetTagFamilies()...) + } + for _, group := range extras { + if len(group) == 0 { + continue + } + names := make([]string, 0, len(group)) + for _, t := range group { + names = append(names, t.GetTagName()) + } + families = append(families, &modelv1.TagProjection_TagFamily{ + Name: group[0].GetFamilyName(), + Tags: names, + }) + } + cloned := proto.Clone(req).(*measurev1.QueryRequest) + cloned.TagProjection = &modelv1.TagProjection{TagFamilies: families} + return cloned +} + +// hiddenTagsMIterator wraps an MIterator and strips hidden criteria tags +// from each Current() result. It is the vec counterpart of the row path's +// hiddenTagsMIterator (pkg/query/logical/measure): the hidden tags were +// projected so storage could filter on them, and are removed here so the +// serialized DataPoints carry only the requested projection — keeping the +// wire bytes byte-identical to a query without hidden criteria tags. +type hiddenTagsMIterator struct { + inner executor.MIterator + hiddenTags logical.HiddenTagSet +} + +func (h *hiddenTagsMIterator) Next() bool { return h.inner.Next() } + +func (h *hiddenTagsMIterator) Current() []*measurev1.InternalDataPoint { + dps := h.inner.Current() + for _, dp := range dps { + if dp == nil || dp.DataPoint == nil { + continue + } + dp.DataPoint.TagFamilies = h.hiddenTags.StripHiddenTags(dp.DataPoint.TagFamilies) + } + return dps +} + +func (h *hiddenTagsMIterator) Close() error { return h.inner.Close() } diff --git a/test/integration/standalone/query/vectorized_test.go b/test/integration/standalone/query/vectorized_test.go index abd037352..6be0a7432 100644 --- a/test/integration/standalone/query/vectorized_test.go +++ b/test/integration/standalone/query/vectorized_test.go @@ -47,10 +47,12 @@ import ( // // With G8d (top-level vec dispatch) wired, plain measure queries take the // new pkg/query/vectorized/measure/plan.Dispatch path instead of the legacy -// leaf-substitution at localIndexScan.maybeVectorized. GroupBy/Agg/Top and -// queries with hidden criteria tags continue through the row plan with leaf -// substitution. The AfterAll assertion below uses vecplan.HandledCount to -// confirm dispatch actually fires for at least one query — protecting +// leaf-substitution at localIndexScan.maybeVectorized. Queries with hidden +// criteria tags (G9d) also route through Dispatch: the hidden tags are +// projected for storage filtering and stripped at egress so the wire bytes +// match the row path. GroupBy/Agg/Top continue through the row plan with +// leaf substitution. The AfterAll assertion below uses vecplan.HandledCount +// to confirm dispatch actually fires for at least one query — protecting // against a silent regression where the eligibility gate excludes // everything. //
