This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit ad99416573352e64fdbddd5cbf4ed03fd24fd030 Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 13 22:48:50 2026 +0000 feat(query/vectorized/measure/plan): flag-gated GroupBy+Agg dispatch (G8d.2) The schema/storage bridge from the previous commit makes native columns flow from storage to BatchAggregation; this commit opens the dispatch eligibility gate to GroupBy+Agg requests — behind a feature flag so the integration parity suite stays green while the vec aggregation egress is engineered to match row-path output byte-for- byte. Adds VectorizedConfig.AggregationEnabled (default false). When true, plan.Dispatch handles GroupBy+Agg requests subject to: - GroupBy and Agg must travel as a pair. Scalar reduce (Agg alone) and raw groupby (GroupBy alone) fall through to row. - Projection coverage: GroupBy keys must appear in the request's TagProjection and the Agg field must appear in FieldProjection, so BatchAggregation can locate its input columns inside the BuildBatchSchema-emitted layout. When AggregationEnabled is false (default), GroupBy+Agg always falls through to row regardless of coverage. Top remains in the fallthrough gate (BatchTop's single-heap semantic still differs from the row path's per-timestamp top-N). Unit tests: - TestDispatch_GroupByAgg_FlagOff_FallsThrough pins the default-off behavior — even a fully covered request falls through. - TestDispatch_GroupByWithoutAgg_FallsThrough and TestDispatch_AggWithoutGroupBy_FallsThrough exercise the pair check with the flag on. - TestDispatch_GroupByAggUncoveredProjection_FallsThrough covers the projection-coverage gate (groupby tag not in TagProjection or agg field not in FieldProjection). - TestDispatch_GroupByAggCovered_ReachesEcQuery confirms a covered request now invokes ec.Query (the gate is open). Integration: full 488-spec suite passes with the flag off; with the flag on, 9 GroupBy+Agg cases fail with output-shape diffs (field naming / timestamp handling) — that parity engineering will land in a follow-up. --- pkg/query/vectorized/measure/config.go | 14 +- pkg/query/vectorized/measure/plan/dispatch.go | 96 +++++++++++- pkg/query/vectorized/measure/plan/dispatch_test.go | 169 ++++++++++++++++++--- 3 files changed, 252 insertions(+), 27 deletions(-) diff --git a/pkg/query/vectorized/measure/config.go b/pkg/query/vectorized/measure/config.go index b505c1a83..df46b947f 100644 --- a/pkg/query/vectorized/measure/config.go +++ b/pkg/query/vectorized/measure/config.go @@ -24,12 +24,20 @@ type VectorizedConfig struct { BatchSize int QueryMemoryMiB int Enabled bool + // AggregationEnabled opens the dispatch gate to GroupBy+Agg requests + // (G8d.2 ships the schema/storage bridge but parity-preserving + // aggregation egress is still being engineered). Default false so + // production keeps routing GroupBy+Agg through the row path until + // the egress contract matches row-path output byte-for-byte. + AggregationEnabled bool } // DefaultConfig returns the v1 default — enabled, 1024-row batches, 256 MiB -// per-query memory budget. v1 ships with Enabled true post-soak/bench-gate -// rollout (G5e). To roll back, pass --measure-vectorized-enabled=false on -// the standalone or data-node command line and restart. +// per-query memory budget, aggregation gate off. v1 ships Enabled true post- +// soak/bench-gate rollout (G5e); AggregationEnabled stays false until the +// vec aggregation egress reaches row-path parity. To roll back the vec +// path entirely, pass --measure-vectorized-enabled=false on the standalone +// or data-node command line and restart. func DefaultConfig() VectorizedConfig { return VectorizedConfig{Enabled: true, BatchSize: 1024, QueryMemoryMiB: 256} } diff --git a/pkg/query/vectorized/measure/plan/dispatch.go b/pkg/query/vectorized/measure/plan/dispatch.go index 4eaeca31b..8ec0f9ee0 100644 --- a/pkg/query/vectorized/measure/plan/dispatch.go +++ b/pkg/query/vectorized/measure/plan/dispatch.go @@ -122,12 +122,39 @@ func Dispatch( if req == nil { return nil, "", false, nil } - if req.GetGroupBy() != nil || req.GetAgg() != nil || req.GetTop() != nil { - // G8d.2 will lift GroupBy/Agg once the scan-source column type - // bridging is in place. Top awaits per-timestamp partitioning of - // BatchTop. + if req.GetTop() != nil { + // Top awaits per-timestamp partitioning of BatchTop; the row + // path's TopN semantic differs from BatchTop's single-heap + // today. return nil, "", false, nil } + hasGroupBy := req.GetGroupBy() != nil + hasAgg := req.GetAgg() != nil + if hasGroupBy || hasAgg { + // G8d.2 wires the schema/storage bridge (BuildBatchSchema emits + // native columns for GroupBy keys + Agg field, storage decoders + // honor it) so the operator pipeline is ready when the egress + // reaches row-path parity. Until then, AggregationEnabled gates + // the dispatch gate; default false keeps GroupBy+Agg on the row + // path so the parity suite stays green. + if !cfg.AggregationEnabled { + return nil, "", false, nil + } + // GroupBy and Agg must travel as a pair (scalar reduce + raw + // groupby are deferred). Either alone falls through. + if hasGroupBy != hasAgg { + return nil, "", false, nil + } + // Projection coverage: BatchAggregation locates its key + value + // columns by name inside the BatchSchema, which is built from + // TagProjection + FieldProjection. Missing coverage means the + // operator would fail at construction; fall through so the row + // path can extend projection implicitly or surface its + // canonical error. + if !aggProjectionCoverage(req) { + return nil, "", false, nil + } + } if req.GetOrderBy() != nil { // The row path resolves order_by via the PushDownOrder optimizer // rule (logical.NewPushDownOrder applied after Analyze). The vec @@ -326,3 +353,64 @@ func findSchemaTagFamily(m *databasev1.Measure, name string) *databasev1.TagFami } return nil } + +// aggProjectionCoverage reports whether the request's GroupBy keys and +// Agg field are all present in the request's projections. Required by +// the dispatch eligibility gate: the BatchAggregation operator locates +// its key + value columns by name inside the BatchSchema, and the +// BatchSchema is built from TagProjection + FieldProjection. Missing +// coverage means the operator would fail at construction; dispatch +// instead falls through so the row path can handle the request. +// +// v1 requires GroupBy.tag_projection to name a single family; that +// family must appear in req.TagProjection with every tag in GroupBy +// present. Agg.field_name must appear in req.FieldProjection. +func aggProjectionCoverage(req *measurev1.QueryRequest) bool { + gb := req.GetGroupBy() + if gb == nil { + return false + } + gbFamilies := gb.GetTagProjection().GetTagFamilies() + if len(gbFamilies) != 1 { + return false + } + gbFamily := gbFamilies[0] + projected := projectedTagsByFamily(req.GetTagProjection()) + present, ok := projected[gbFamily.GetName()] + if !ok { + return false + } + for _, name := range gbFamily.GetTags() { + if _, hit := present[name]; !hit { + return false + } + } + // Agg field must be in FieldProjection. + aggField := req.GetAgg().GetFieldName() + if aggField == "" { + return false + } + for _, name := range req.GetFieldProjection().GetNames() { + if name == aggField { + return true + } + } + return false +} + +// projectedTagsByFamily flattens a TagProjection into family → name-set. +func projectedTagsByFamily(tp *modelv1.TagProjection) map[string]map[string]struct{} { + out := make(map[string]map[string]struct{}) + if tp == nil { + return out + } + for _, tf := range tp.GetTagFamilies() { + family := tf.GetName() + names := make(map[string]struct{}, len(tf.GetTags())) + for _, n := range tf.GetTags() { + names[n] = struct{}{} + } + out[family] = names + } + return out +} diff --git a/pkg/query/vectorized/measure/plan/dispatch_test.go b/pkg/query/vectorized/measure/plan/dispatch_test.go index 43288a93f..c8a8a2532 100644 --- a/pkg/query/vectorized/measure/plan/dispatch_test.go +++ b/pkg/query/vectorized/measure/plan/dispatch_test.go @@ -36,6 +36,14 @@ func dispatchCfg(enabled bool) measure.VectorizedConfig { return measure.VectorizedConfig{Enabled: enabled, BatchSize: 1024, QueryMemoryMiB: 16} } +// dispatchCfgWithAgg returns a vec-enabled config with the G8d.2 +// aggregation gate flipped on so dispatch handles GroupBy+Agg requests. +func dispatchCfgWithAgg() measure.VectorizedConfig { + cfg := dispatchCfg(true) + cfg.AggregationEnabled = true + return cfg +} + func bareReq() *measurev1.QueryRequest { return &measurev1.QueryRequest{ Name: "demo", @@ -65,8 +73,12 @@ func TestDispatch_NotEnabled_FallsThrough(t *testing.T) { } } -// TestDispatch_GroupBy_FallsThrough covers the column-bridging gate. -func TestDispatch_GroupBy_FallsThrough(t *testing.T) { +// TestDispatch_GroupByAgg_FlagOff_FallsThrough pins the G8d.2 default: +// even a fully-covered GroupBy+Agg request falls through when +// AggregationEnabled is false (the default). This is the load-bearing +// behavior that keeps the integration parity suite green while the +// vec aggregation egress is engineered to match row-path output. +func TestDispatch_GroupByAgg_FlagOff_FallsThrough(t *testing.T) { req := bareReq() req.GroupBy = &measurev1.QueryRequest_GroupBy{ TagProjection: projTagProj(), @@ -77,30 +89,110 @@ func TestDispatch_GroupBy_FallsThrough(t *testing.T) { FieldName: "value", } _, _, handled, err := Dispatch(context.Background(), - req, nil, nil, nil, nil, dispatchCfg(true)) + req, nil, nil, nil, nil, dispatchCfg(true)) // AggregationEnabled = false (default) if err != nil { - t.Fatalf("GroupBy fallthrough must not error: %v", err) + t.Fatalf("flag-off fallthrough must not error: %v", err) } if handled { - t.Fatal("GroupBy+Agg must fall through to row path in G8d.1") + t.Fatal("GroupBy+Agg must fall through when AggregationEnabled is false") } } -// TestDispatch_Agg_FallsThrough covers Agg-without-GroupBy (which the -// analyzer would reject, but the dispatch gate fires before Analyze). -func TestDispatch_Agg_FallsThrough(t *testing.T) { +// TestDispatch_GroupByWithoutAgg_FallsThrough covers the pair check +// (GroupBy and Agg must travel together). Uses the agg-enabled config +// so we reach the pair check rather than the flag-off gate. +func TestDispatch_GroupByWithoutAgg_FallsThrough(t *testing.T) { + req := bareReq() + req.GroupBy = &measurev1.QueryRequest_GroupBy{ + TagProjection: projTagProj(), + FieldName: "value", + } + _, _, handled, err := Dispatch(context.Background(), + req, nil, nil, nil, nil, dispatchCfgWithAgg()) + if err != nil { + t.Fatalf("GroupBy-without-Agg fallthrough must not error: %v", err) + } + if handled { + t.Fatal("GroupBy without Agg must fall through to row path") + } +} + +// TestDispatch_AggWithoutGroupBy_FallsThrough is the Agg-only counterpart. +// Scalar reduce is deferred; Agg without GroupBy falls through. +func TestDispatch_AggWithoutGroupBy_FallsThrough(t *testing.T) { req := bareReq() req.Agg = &measurev1.QueryRequest_Aggregation{ Function: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, FieldName: "value", } _, _, handled, err := Dispatch(context.Background(), - req, nil, nil, nil, nil, dispatchCfg(true)) + req, nil, nil, nil, nil, dispatchCfgWithAgg()) if err != nil { - t.Fatalf("Agg fallthrough must not error: %v", err) + t.Fatalf("Agg-without-GroupBy fallthrough must not error: %v", err) } if handled { - t.Fatal("Agg must fall through to row path in G8d.1") + t.Fatal("Agg without GroupBy must fall through to row path") + } +} + +// TestDispatch_GroupByAggUncoveredProjection_FallsThrough covers the +// G8d.2 projection-coverage gate. BatchAggregation locates its key + +// value columns by name inside the BatchSchema, which is built from +// TagProjection + FieldProjection. When GroupBy keys or the Agg field +// are not in the request's projection, dispatch falls through so the +// row path can either extend projection implicitly or surface its +// canonical error. +func TestDispatch_GroupByAggUncoveredProjection_FallsThrough(t *testing.T) { + cases := []struct { + name string + mutate func(*measurev1.QueryRequest) + comment string + }{ + { + name: "groupby_tag_not_in_projection", + mutate: func(req *measurev1.QueryRequest) { + // GroupBy references "region" but TagProjection only carries "svc". + req.GroupBy = &measurev1.QueryRequest_GroupBy{ + TagProjection: &modelv1.TagProjection{TagFamilies: []*modelv1.TagProjection_TagFamily{ + {Name: "default", Tags: []string{"region"}}, + }}, + FieldName: "value", + } + req.Agg = &measurev1.QueryRequest_Aggregation{ + Function: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, + FieldName: "value", + } + }, + }, + { + name: "agg_field_not_in_projection", + mutate: func(req *measurev1.QueryRequest) { + req.GroupBy = &measurev1.QueryRequest_GroupBy{ + TagProjection: projTagProj(), + FieldName: "value", + } + req.Agg = &measurev1.QueryRequest_Aggregation{ + Function: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, + FieldName: "value", + } + // Strip "value" from FieldProjection so the Agg field is uncovered. + req.FieldProjection = nil + }, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + req := bareReq() + c.mutate(req) + _, _, handled, err := Dispatch(context.Background(), + req, nil, nil, nil, nil, dispatchCfgWithAgg()) + if err != nil { + t.Fatalf("uncovered projection must not error: %v", err) + } + if handled { + t.Fatal("uncovered GroupBy / Agg projection must fall through") + } + }) } } @@ -290,18 +382,14 @@ func TestDispatch_Counters_TrackFellThroughCalls(t *testing.T) { startHandled := HandledCount() startFellThrough := FellThroughCount() - // Three fallthroughs of distinct shapes. - groupByReq := bareReq() - groupByReq.GroupBy = &measurev1.QueryRequest_GroupBy{ - TagProjection: projTagProj(), FieldName: "value", - } - groupByReq.Agg = &measurev1.QueryRequest_Aggregation{ - Function: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, FieldName: "value", - } + // Three fallthroughs of distinct shapes, each tripping a different + // gate so the counter is exercised across the eligibility branches. + topReq := bareReq() + topReq.Top = &measurev1.QueryRequest_Top{Number: 5, FieldName: "value"} noTimeReq := bareReq() noTimeReq.TimeRange = nil - for _, req := range []*measurev1.QueryRequest{groupByReq, noTimeReq, bareReq() /* nil ec */} { + for _, req := range []*measurev1.QueryRequest{topReq, noTimeReq, bareReq() /* nil ec */} { _, _, handled, dispatchErr := Dispatch(context.Background(), req, nil, nil, nil, nil, dispatchCfg(true)) if dispatchErr != nil { @@ -320,6 +408,47 @@ func TestDispatch_Counters_TrackFellThroughCalls(t *testing.T) { } } +// TestDispatch_GroupByAggCovered_ReachesEcQuery confirms G8d.2 has +// opened the dispatch gate for GroupBy+Agg requests whose projection +// covers both the GroupBy keys and the Agg field. The fakeEC returns +// nil so dispatch falls through after ec.Query (matching the empty- +// result branch) — what matters is that ec.Query was invoked at all, +// proving the eligibility gate let the request through. With G8d.1 +// this would have been rejected before any ec interaction. +func TestDispatch_GroupByAggCovered_ReachesEcQuery(t *testing.T) { + measureSchema := testMeasureSchema() + // nolint:staticcheck // SA1019 — row-path BuildSchema is the only schema builder until G8 replaces it. + logicalSchema, schemaErr := logicalmeasure.BuildSchema(measureSchema, nil) + if schemaErr != nil { + t.Fatalf("BuildSchema: %v", schemaErr) + } + metadata := &commonv1.Metadata{Name: "demo", Group: "default"} + ec := &fakeEC{wantResult: nil, wantErr: nil} + + req := bareReq() + req.GroupBy = &measurev1.QueryRequest_GroupBy{ + TagProjection: projTagProj(), + FieldName: "value", + } + req.Agg = &measurev1.QueryRequest_Aggregation{ + Function: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, + FieldName: "value", + } + + iter, planStr, handled, err := Dispatch(context.Background(), + req, metadata, measureSchema, logicalSchema, ec, dispatchCfgWithAgg()) + if err != nil { + t.Fatalf("covered GroupBy+Agg must not error before ec.Query: %v", err) + } + if !ec.called { + // Surface the other return values for debugging when the gate + // regression resurfaces — they're all zero-valued today because + // ec.Query returned (nil, nil) and dispatch fell through. + t.Fatalf("covered GroupBy+Agg must reach ec.Query (G8d.2 gate); "+ + "got iter=%v planStr=%q handled=%v", iter, planStr, handled) + } +} + // TestDispatch_QueryError_BubblesUp covers the error propagation when // the storage query itself fails. Dispatch must report (nil, "", true, // err) so the caller surfaces the error rather than re-trying the row
