This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c4e3ea8666f743c3a4e62568ea100568cc543eab Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 13 22:21:21 2026 +0000 feat(query/vectorized/measure/plan): dispatch eligibility hardening + observability counters (G8e) Two G8e changes packaged together because the integration parity gate surfaced both at once. 1. Tighten the eligibility gate so dispatch falls through on requests the v1 wire-up cannot yet handle correctly: - req.GetOrderBy() != nil. The row path applies OrderBy via the logical.NewPushDownOrder optimizer rule (post-Analyze). Dispatch does not invoke those rules and would silently emit unsorted rows. Fall through until dispatch threads order_by into model.MeasureQueryOptions.Order. - Unknown tag or field in the request's projection. The row path's ValidateProjectionTags / ValidateProjectionFields rejects these with a descriptive error (and the WantErr=true fixtures in test/cases/measure depend on it). Dispatch falls through so the row path surfaces the canonical error message. The two gates plug the only parity regressions observed when the 488-spec measure/topn suite was re-run against a vec-enabled standalone post-G8d. With them in place, every spec passes. 2. Add process-wide atomic counters HandledCount + FellThroughCount so the integration parity gate can assert dispatch is being exercised rather than silently 0%-covered. Implemented via a deferred increment on Dispatch's named returns so every handled / fall-through branch is counted automatically; errors are deliberately uncounted. Unit tests cover every new gate (OrderBy, unknown tag, unknown field) and a counter-snapshot test confirming three fallthrough calls produce FellThroughCount delta = 3 with HandledCount delta = 0. --- pkg/query/vectorized/measure/plan/dispatch.go | 104 +++++++++++++++++- pkg/query/vectorized/measure/plan/dispatch_test.go | 116 +++++++++++++++++++++ 2 files changed, 219 insertions(+), 1 deletion(-) diff --git a/pkg/query/vectorized/measure/plan/dispatch.go b/pkg/query/vectorized/measure/plan/dispatch.go index a57228050..4eaeca31b 100644 --- a/pkg/query/vectorized/measure/plan/dispatch.go +++ b/pkg/query/vectorized/measure/plan/dispatch.go @@ -20,6 +20,7 @@ package plan import ( "context" "fmt" + "sync/atomic" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" @@ -36,6 +37,29 @@ import ( "github.com/apache/skywalking-banyandb/pkg/timestamp" ) +// Process-wide observability counters for G8e parity testing. Tests +// assert HandledCount > 0 to prove dispatch fires (vs silently falling +// through), and FellThroughCount > 0 to prove the row path still serves +// queries Dispatch is not yet ready to take. +// +// Counters are best-effort: under concurrent queries the deltas are +// accurate, but a test reading them across a workload may observe +// updates from unrelated queries. Snapshot via Load() before the test +// workload and compute the delta. +var ( + handledCount atomic.Int64 + fellThroughCount atomic.Int64 +) + +// HandledCount returns the cumulative number of vec dispatch successes +// observed by this process. +func HandledCount() int64 { return handledCount.Load() } + +// FellThroughCount returns the cumulative number of times Dispatch +// declined to handle a request (returned handled=false, err=nil) in +// this process. +func FellThroughCount() int64 { return fellThroughCount.Load() } + // Dispatch is the G8d top-level entry into the vec measure subsystem. // // Called from banyand/query/processor.go before the row-path Analyze runs. @@ -78,7 +102,20 @@ func Dispatch( logicalSchema logical.Schema, ec executor.MeasureExecutionContext, cfg measure.VectorizedConfig, -) (executor.MIterator, string, bool, error) { +) (iter executor.MIterator, planStr string, handled bool, err error) { + defer func() { + // Errors are surfaced as-is; only count clean handled / fall- + // through outcomes so observability matches the caller's + // branching contract. + if err != nil { + return + } + if handled { + handledCount.Add(1) + } else { + fellThroughCount.Add(1) + } + }() if !cfg.Enabled { return nil, "", false, nil } @@ -91,6 +128,14 @@ func Dispatch( // BatchTop. return nil, "", false, nil } + if req.GetOrderBy() != nil { + // The row path resolves order_by via the PushDownOrder optimizer + // rule (logical.NewPushDownOrder applied after Analyze). The vec + // dispatch does not invoke those rules, so it would silently + // drop OrderBy and return unsorted rows. Fall through until + // dispatch threads order_by into model.MeasureQueryOptions.Order. + return nil, "", false, nil + } if req.GetTimeRange() == nil { return nil, "", false, nil } @@ -101,6 +146,15 @@ func Dispatch( return nil, "", false, nil } + // Projection validation. The row path's Analyze rejects unknown + // projection names via ValidateProjectionTags / ValidateProjectionFields + // and surfaces a descriptive error. Dispatch falls through so the + // row path produces that canonical error (test fixtures with + // WantErr=true depend on it). + if !projectionsExistInSchema(req, measureSchema) { + return nil, "", false, nil + } + // Hidden-tag detection: criteria may reference tags that are NOT in // the projection (they're needed only as filter inputs). The row // path strips them at egress via hiddenTagsMIterator. v1 dispatch @@ -224,3 +278,51 @@ func projectedNames(tp *modelv1.TagProjection) map[string]struct{} { } return out } + +// projectionsExistInSchema returns false if any tag (in any requested tag +// family) or field name in the request's projection is absent from the +// Measure schema. Callers use the result as an eligibility gate: missing +// names route through the row path, which surfaces a descriptive error +// via logical_measure.Analyze. +func projectionsExistInSchema(req *measurev1.QueryRequest, m *databasev1.Measure) bool { + if tp := req.GetTagProjection(); tp != nil { + for _, reqFamily := range tp.GetTagFamilies() { + schemaFamily := findSchemaTagFamily(m, reqFamily.GetName()) + if schemaFamily == nil { + return false + } + known := make(map[string]struct{}, len(schemaFamily.GetTags())) + for _, ts := range schemaFamily.GetTags() { + known[ts.GetName()] = struct{}{} + } + for _, name := range reqFamily.GetTags() { + if _, ok := known[name]; !ok { + return false + } + } + } + } + if fp := req.GetFieldProjection(); fp != nil && len(fp.GetNames()) > 0 { + known := make(map[string]struct{}, len(m.GetFields())) + for _, fs := range m.GetFields() { + known[fs.GetName()] = struct{}{} + } + for _, name := range fp.GetNames() { + if _, ok := known[name]; !ok { + return false + } + } + } + return true +} + +// findSchemaTagFamily returns the schema-defined tag family with the +// given name, or nil if no such family exists. +func findSchemaTagFamily(m *databasev1.Measure, name string) *databasev1.TagFamilySpec { + for _, tf := range m.GetTagFamilies() { + if tf.GetName() == name { + return tf + } + } + return nil +} diff --git a/pkg/query/vectorized/measure/plan/dispatch_test.go b/pkg/query/vectorized/measure/plan/dispatch_test.go index ea5f9ae99..43288a93f 100644 --- a/pkg/query/vectorized/measure/plan/dispatch_test.go +++ b/pkg/query/vectorized/measure/plan/dispatch_test.go @@ -122,6 +122,83 @@ func TestDispatch_Top_FallsThrough(t *testing.T) { } } +// TestDispatch_OrderBy_FallsThrough covers the order_by gap. The row +// path resolves OrderBy via the PushDownOrder optimizer rule which the +// vec dispatch does not invoke. Until dispatch threads OrderBy into +// MeasureQueryOptions.Order, requests with OrderBy must fall through. +func TestDispatch_OrderBy_FallsThrough(t *testing.T) { + req := bareReq() + req.OrderBy = &modelv1.QueryOrder{ + Sort: modelv1.Sort_SORT_DESC, + } + _, _, handled, err := Dispatch(context.Background(), + req, nil, nil, nil, nil, dispatchCfg(true)) + if err != nil { + t.Fatalf("OrderBy fallthrough must not error: %v", err) + } + if handled { + t.Fatal("OrderBy must fall through (row path applies it via PushDownOrder)") + } +} + +// TestDispatch_UnknownTagProjection_FallsThrough covers the parity gap +// for WantErr=true fixtures: the row path rejects unknown tags via +// ValidateProjectionTags and returns a descriptive error. Dispatch +// falls through so the row path surfaces that canonical error. +func TestDispatch_UnknownTagProjection_FallsThrough(t *testing.T) { + measureSchema := testMeasureSchema() + // nolint:staticcheck // SA1019 — row-path BuildSchema is the only schema builder until G8 replaces it. + logicalSchema, schemaErr := logicalmeasure.BuildSchema(measureSchema, nil) + if schemaErr != nil { + t.Fatalf("BuildSchema: %v", schemaErr) + } + metadata := &commonv1.Metadata{Name: "demo", Group: "default"} + ec := &fakeEC{wantResult: nil, wantErr: nil} + + req := bareReq() + req.TagProjection = &modelv1.TagProjection{TagFamilies: []*modelv1.TagProjection_TagFamily{ + {Name: "default", Tags: []string{"ghost"}}, + }} + _, _, handled, err := Dispatch(context.Background(), + req, metadata, measureSchema, logicalSchema, ec, dispatchCfg(true)) + if err != nil { + t.Fatalf("unknown tag fallthrough must not error: %v", err) + } + if handled { + t.Fatal("unknown tag in projection must fall through (row path returns WantErr)") + } + if ec.called { + t.Fatal("ec.Query must not be invoked when projection is invalid") + } +} + +// TestDispatch_UnknownFieldProjection_FallsThrough is the field-side +// counterpart of UnknownTagProjection. +func TestDispatch_UnknownFieldProjection_FallsThrough(t *testing.T) { + measureSchema := testMeasureSchema() + // nolint:staticcheck // SA1019 — row-path BuildSchema is the only schema builder until G8 replaces it. + logicalSchema, schemaErr := logicalmeasure.BuildSchema(measureSchema, nil) + if schemaErr != nil { + t.Fatalf("BuildSchema: %v", schemaErr) + } + metadata := &commonv1.Metadata{Name: "demo", Group: "default"} + ec := &fakeEC{wantResult: nil, wantErr: nil} + + req := bareReq() + req.FieldProjection = &measurev1.QueryRequest_FieldProjection{Names: []string{"ghost"}} + _, _, handled, err := Dispatch(context.Background(), + req, metadata, measureSchema, logicalSchema, ec, dispatchCfg(true)) + if err != nil { + t.Fatalf("unknown field fallthrough must not error: %v", err) + } + if handled { + t.Fatal("unknown field in projection must fall through (row path returns WantErr)") + } + if ec.called { + t.Fatal("ec.Query must not be invoked when projection is invalid") + } +} + // TestDispatch_NoTimeRange_FallsThrough covers the bounded-window // requirement. func TestDispatch_NoTimeRange_FallsThrough(t *testing.T) { @@ -204,6 +281,45 @@ func TestDispatch_EmptyResult_FallsThrough(t *testing.T) { } } +// TestDispatch_Counters_TrackFellThroughCalls confirms the +// FellThroughCount counter increments on every non-error fallthrough. +// HandledCount must not move when dispatch declines. This is the unit- +// level half of the G8e parity-gate observability — integration runs +// assert HandledCount > 0 after replaying the measure/topn cases. +func TestDispatch_Counters_TrackFellThroughCalls(t *testing.T) { + startHandled := HandledCount() + startFellThrough := FellThroughCount() + + // Three fallthroughs of distinct shapes. + groupByReq := bareReq() + groupByReq.GroupBy = &measurev1.QueryRequest_GroupBy{ + TagProjection: projTagProj(), FieldName: "value", + } + groupByReq.Agg = &measurev1.QueryRequest_Aggregation{ + Function: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, FieldName: "value", + } + noTimeReq := bareReq() + noTimeReq.TimeRange = nil + + for _, req := range []*measurev1.QueryRequest{groupByReq, noTimeReq, bareReq() /* nil ec */} { + _, _, handled, dispatchErr := Dispatch(context.Background(), + req, nil, nil, nil, nil, dispatchCfg(true)) + if dispatchErr != nil { + t.Fatalf("fallthrough must not error: %v", dispatchErr) + } + if handled { + t.Fatal("test expected fallthrough; got handled=true") + } + } + + if got := HandledCount() - startHandled; got != 0 { + t.Fatalf("HandledCount delta: want 0, got %d", got) + } + if got := FellThroughCount() - startFellThrough; got != 3 { + t.Fatalf("FellThroughCount delta: want 3, got %d", got) + } +} + // TestDispatch_QueryError_BubblesUp covers the error propagation when // the storage query itself fails. Dispatch must report (nil, "", true, // err) so the caller surfaces the error rather than re-trying the row
