This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit ad99416573352e64fdbddd5cbf4ed03fd24fd030
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 13 22:48:50 2026 +0000

    feat(query/vectorized/measure/plan): flag-gated GroupBy+Agg dispatch (G8d.2)
    
    The schema/storage bridge from the previous commit makes native
    columns flow from storage to BatchAggregation; this commit opens the
    dispatch eligibility gate to GroupBy+Agg requests — behind a feature
    flag so the integration parity suite stays green while the vec
    aggregation egress is engineered to match row-path output byte-for-
    byte.
    
    Adds VectorizedConfig.AggregationEnabled (default false). When true,
    plan.Dispatch handles GroupBy+Agg requests subject to:
    
      - GroupBy and Agg must travel as a pair. Scalar reduce (Agg alone)
        and raw groupby (GroupBy alone) fall through to row.
      - Projection coverage: GroupBy keys must appear in the request's
        TagProjection and the Agg field must appear in FieldProjection,
        so BatchAggregation can locate its input columns inside the
        BuildBatchSchema-emitted layout.
    
    When AggregationEnabled is false (default), GroupBy+Agg always falls
    through to row regardless of coverage. Top remains in the fallthrough
    gate (BatchTop's single-heap semantic still differs from the row
    path's per-timestamp top-N).
    
    Unit tests:
      - TestDispatch_GroupByAgg_FlagOff_FallsThrough pins the default-off
        behavior — even a fully covered request falls through.
      - TestDispatch_GroupByWithoutAgg_FallsThrough and
        TestDispatch_AggWithoutGroupBy_FallsThrough exercise the pair
        check with the flag on.
      - TestDispatch_GroupByAggUncoveredProjection_FallsThrough covers
        the projection-coverage gate (groupby tag not in TagProjection
        or agg field not in FieldProjection).
      - TestDispatch_GroupByAggCovered_ReachesEcQuery confirms a covered
        request now invokes ec.Query (the gate is open).
    
    Integration: full 488-spec suite passes with the flag off; with the
    flag on, 9 GroupBy+Agg cases fail with output-shape diffs (field
    naming / timestamp handling) — that parity engineering will land in a
    follow-up.
---
 pkg/query/vectorized/measure/config.go             |  14 +-
 pkg/query/vectorized/measure/plan/dispatch.go      |  96 +++++++++++-
 pkg/query/vectorized/measure/plan/dispatch_test.go | 169 ++++++++++++++++++---
 3 files changed, 252 insertions(+), 27 deletions(-)

diff --git a/pkg/query/vectorized/measure/config.go 
b/pkg/query/vectorized/measure/config.go
index b505c1a83..df46b947f 100644
--- a/pkg/query/vectorized/measure/config.go
+++ b/pkg/query/vectorized/measure/config.go
@@ -24,12 +24,20 @@ type VectorizedConfig struct {
        BatchSize      int
        QueryMemoryMiB int
        Enabled        bool
+       // AggregationEnabled opens the dispatch gate to GroupBy+Agg requests
+       // (G8d.2 ships the schema/storage bridge but parity-preserving
+       // aggregation egress is still being engineered). Default false so
+       // production keeps routing GroupBy+Agg through the row path until
+       // the egress contract matches row-path output byte-for-byte.
+       AggregationEnabled bool
 }
 
 // DefaultConfig returns the v1 default — enabled, 1024-row batches, 256 MiB
-// per-query memory budget. v1 ships with Enabled true post-soak/bench-gate
-// rollout (G5e). To roll back, pass --measure-vectorized-enabled=false on
-// the standalone or data-node command line and restart.
+// per-query memory budget, aggregation gate off. v1 ships Enabled true post-
+// soak/bench-gate rollout (G5e); AggregationEnabled stays false until the
+// vec aggregation egress reaches row-path parity. To roll back the vec
+// path entirely, pass --measure-vectorized-enabled=false on the standalone
+// or data-node command line and restart.
 func DefaultConfig() VectorizedConfig {
        return VectorizedConfig{Enabled: true, BatchSize: 1024, QueryMemoryMiB: 
256}
 }
diff --git a/pkg/query/vectorized/measure/plan/dispatch.go 
b/pkg/query/vectorized/measure/plan/dispatch.go
index 4eaeca31b..8ec0f9ee0 100644
--- a/pkg/query/vectorized/measure/plan/dispatch.go
+++ b/pkg/query/vectorized/measure/plan/dispatch.go
@@ -122,12 +122,39 @@ func Dispatch(
        if req == nil {
                return nil, "", false, nil
        }
-       if req.GetGroupBy() != nil || req.GetAgg() != nil || req.GetTop() != 
nil {
-               // G8d.2 will lift GroupBy/Agg once the scan-source column type
-               // bridging is in place. Top awaits per-timestamp partitioning 
of
-               // BatchTop.
+       if req.GetTop() != nil {
+               // Top awaits per-timestamp partitioning of BatchTop; the row
+               // path's TopN semantic differs from BatchTop's single-heap
+               // today.
                return nil, "", false, nil
        }
+       hasGroupBy := req.GetGroupBy() != nil
+       hasAgg := req.GetAgg() != nil
+       if hasGroupBy || hasAgg {
+               // G8d.2 wires the schema/storage bridge (BuildBatchSchema emits
+               // native columns for GroupBy keys + Agg field, storage decoders
+               // honor it) so the operator pipeline is ready when the egress
+               // reaches row-path parity. Until then, AggregationEnabled gates
+               // the dispatch gate; default false keeps GroupBy+Agg on the row
+               // path so the parity suite stays green.
+               if !cfg.AggregationEnabled {
+                       return nil, "", false, nil
+               }
+               // GroupBy and Agg must travel as a pair (scalar reduce + raw
+               // groupby are deferred). Either alone falls through.
+               if hasGroupBy != hasAgg {
+                       return nil, "", false, nil
+               }
+               // Projection coverage: BatchAggregation locates its key + value
+               // columns by name inside the BatchSchema, which is built from
+               // TagProjection + FieldProjection. Missing coverage means the
+               // operator would fail at construction; fall through so the row
+               // path can extend projection implicitly or surface its
+               // canonical error.
+               if !aggProjectionCoverage(req) {
+                       return nil, "", false, nil
+               }
+       }
        if req.GetOrderBy() != nil {
                // The row path resolves order_by via the PushDownOrder 
optimizer
                // rule (logical.NewPushDownOrder applied after Analyze). The 
vec
@@ -326,3 +353,64 @@ func findSchemaTagFamily(m *databasev1.Measure, name 
string) *databasev1.TagFami
        }
        return nil
 }
+
+// aggProjectionCoverage reports whether the request's GroupBy keys and
+// Agg field are all present in the request's projections. Required by
+// the dispatch eligibility gate: the BatchAggregation operator locates
+// its key + value columns by name inside the BatchSchema, and the
+// BatchSchema is built from TagProjection + FieldProjection. Missing
+// coverage means the operator would fail at construction; dispatch
+// instead falls through so the row path can handle the request.
+//
+// v1 requires GroupBy.tag_projection to name a single family; that
+// family must appear in req.TagProjection with every tag in GroupBy
+// present. Agg.field_name must appear in req.FieldProjection.
+func aggProjectionCoverage(req *measurev1.QueryRequest) bool {
+       gb := req.GetGroupBy()
+       if gb == nil {
+               return false
+       }
+       gbFamilies := gb.GetTagProjection().GetTagFamilies()
+       if len(gbFamilies) != 1 {
+               return false
+       }
+       gbFamily := gbFamilies[0]
+       projected := projectedTagsByFamily(req.GetTagProjection())
+       present, ok := projected[gbFamily.GetName()]
+       if !ok {
+               return false
+       }
+       for _, name := range gbFamily.GetTags() {
+               if _, hit := present[name]; !hit {
+                       return false
+               }
+       }
+       // Agg field must be in FieldProjection.
+       aggField := req.GetAgg().GetFieldName()
+       if aggField == "" {
+               return false
+       }
+       for _, name := range req.GetFieldProjection().GetNames() {
+               if name == aggField {
+                       return true
+               }
+       }
+       return false
+}
+
+// projectedTagsByFamily flattens a TagProjection into family → name-set.
+func projectedTagsByFamily(tp *modelv1.TagProjection) 
map[string]map[string]struct{} {
+       out := make(map[string]map[string]struct{})
+       if tp == nil {
+               return out
+       }
+       for _, tf := range tp.GetTagFamilies() {
+               family := tf.GetName()
+               names := make(map[string]struct{}, len(tf.GetTags()))
+               for _, n := range tf.GetTags() {
+                       names[n] = struct{}{}
+               }
+               out[family] = names
+       }
+       return out
+}
diff --git a/pkg/query/vectorized/measure/plan/dispatch_test.go 
b/pkg/query/vectorized/measure/plan/dispatch_test.go
index 43288a93f..c8a8a2532 100644
--- a/pkg/query/vectorized/measure/plan/dispatch_test.go
+++ b/pkg/query/vectorized/measure/plan/dispatch_test.go
@@ -36,6 +36,14 @@ func dispatchCfg(enabled bool) measure.VectorizedConfig {
        return measure.VectorizedConfig{Enabled: enabled, BatchSize: 1024, 
QueryMemoryMiB: 16}
 }
 
+// dispatchCfgWithAgg returns a vec-enabled config with the G8d.2
+// aggregation gate flipped on so dispatch handles GroupBy+Agg requests.
+func dispatchCfgWithAgg() measure.VectorizedConfig {
+       cfg := dispatchCfg(true)
+       cfg.AggregationEnabled = true
+       return cfg
+}
+
 func bareReq() *measurev1.QueryRequest {
        return &measurev1.QueryRequest{
                Name:            "demo",
@@ -65,8 +73,12 @@ func TestDispatch_NotEnabled_FallsThrough(t *testing.T) {
        }
 }
 
-// TestDispatch_GroupBy_FallsThrough covers the column-bridging gate.
-func TestDispatch_GroupBy_FallsThrough(t *testing.T) {
+// TestDispatch_GroupByAgg_FlagOff_FallsThrough pins the G8d.2 default:
+// even a fully-covered GroupBy+Agg request falls through when
+// AggregationEnabled is false (the default). This is the load-bearing
+// behavior that keeps the integration parity suite green while the
+// vec aggregation egress is engineered to match row-path output.
+func TestDispatch_GroupByAgg_FlagOff_FallsThrough(t *testing.T) {
        req := bareReq()
        req.GroupBy = &measurev1.QueryRequest_GroupBy{
                TagProjection: projTagProj(),
@@ -77,30 +89,110 @@ func TestDispatch_GroupBy_FallsThrough(t *testing.T) {
                FieldName: "value",
        }
        _, _, handled, err := Dispatch(context.Background(),
-               req, nil, nil, nil, nil, dispatchCfg(true))
+               req, nil, nil, nil, nil, dispatchCfg(true)) // 
AggregationEnabled = false (default)
        if err != nil {
-               t.Fatalf("GroupBy fallthrough must not error: %v", err)
+               t.Fatalf("flag-off fallthrough must not error: %v", err)
        }
        if handled {
-               t.Fatal("GroupBy+Agg must fall through to row path in G8d.1")
+               t.Fatal("GroupBy+Agg must fall through when AggregationEnabled 
is false")
        }
 }
 
-// TestDispatch_Agg_FallsThrough covers Agg-without-GroupBy (which the
-// analyzer would reject, but the dispatch gate fires before Analyze).
-func TestDispatch_Agg_FallsThrough(t *testing.T) {
+// TestDispatch_GroupByWithoutAgg_FallsThrough covers the pair check
+// (GroupBy and Agg must travel together). Uses the agg-enabled config
+// so we reach the pair check rather than the flag-off gate.
+func TestDispatch_GroupByWithoutAgg_FallsThrough(t *testing.T) {
+       req := bareReq()
+       req.GroupBy = &measurev1.QueryRequest_GroupBy{
+               TagProjection: projTagProj(),
+               FieldName:     "value",
+       }
+       _, _, handled, err := Dispatch(context.Background(),
+               req, nil, nil, nil, nil, dispatchCfgWithAgg())
+       if err != nil {
+               t.Fatalf("GroupBy-without-Agg fallthrough must not error: %v", 
err)
+       }
+       if handled {
+               t.Fatal("GroupBy without Agg must fall through to row path")
+       }
+}
+
+// TestDispatch_AggWithoutGroupBy_FallsThrough is the Agg-only counterpart.
+// Scalar reduce is deferred; Agg without GroupBy falls through.
+func TestDispatch_AggWithoutGroupBy_FallsThrough(t *testing.T) {
        req := bareReq()
        req.Agg = &measurev1.QueryRequest_Aggregation{
                Function:  modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
                FieldName: "value",
        }
        _, _, handled, err := Dispatch(context.Background(),
-               req, nil, nil, nil, nil, dispatchCfg(true))
+               req, nil, nil, nil, nil, dispatchCfgWithAgg())
        if err != nil {
-               t.Fatalf("Agg fallthrough must not error: %v", err)
+               t.Fatalf("Agg-without-GroupBy fallthrough must not error: %v", 
err)
        }
        if handled {
-               t.Fatal("Agg must fall through to row path in G8d.1")
+               t.Fatal("Agg without GroupBy must fall through to row path")
+       }
+}
+
+// TestDispatch_GroupByAggUncoveredProjection_FallsThrough covers the
+// G8d.2 projection-coverage gate. BatchAggregation locates its key +
+// value columns by name inside the BatchSchema, which is built from
+// TagProjection + FieldProjection. When GroupBy keys or the Agg field
+// are not in the request's projection, dispatch falls through so the
+// row path can either extend projection implicitly or surface its
+// canonical error.
+func TestDispatch_GroupByAggUncoveredProjection_FallsThrough(t *testing.T) {
+       cases := []struct {
+               name    string
+               mutate  func(*measurev1.QueryRequest)
+               comment string
+       }{
+               {
+                       name: "groupby_tag_not_in_projection",
+                       mutate: func(req *measurev1.QueryRequest) {
+                               // GroupBy references "region" but 
TagProjection only carries "svc".
+                               req.GroupBy = &measurev1.QueryRequest_GroupBy{
+                                       TagProjection: 
&modelv1.TagProjection{TagFamilies: []*modelv1.TagProjection_TagFamily{
+                                               {Name: "default", Tags: 
[]string{"region"}},
+                                       }},
+                                       FieldName: "value",
+                               }
+                               req.Agg = &measurev1.QueryRequest_Aggregation{
+                                       Function:  
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
+                                       FieldName: "value",
+                               }
+                       },
+               },
+               {
+                       name: "agg_field_not_in_projection",
+                       mutate: func(req *measurev1.QueryRequest) {
+                               req.GroupBy = &measurev1.QueryRequest_GroupBy{
+                                       TagProjection: projTagProj(),
+                                       FieldName:     "value",
+                               }
+                               req.Agg = &measurev1.QueryRequest_Aggregation{
+                                       Function:  
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
+                                       FieldName: "value",
+                               }
+                               // Strip "value" from FieldProjection so the 
Agg field is uncovered.
+                               req.FieldProjection = nil
+                       },
+               },
+       }
+       for _, c := range cases {
+               t.Run(c.name, func(t *testing.T) {
+                       req := bareReq()
+                       c.mutate(req)
+                       _, _, handled, err := Dispatch(context.Background(),
+                               req, nil, nil, nil, nil, dispatchCfgWithAgg())
+                       if err != nil {
+                               t.Fatalf("uncovered projection must not error: 
%v", err)
+                       }
+                       if handled {
+                               t.Fatal("uncovered GroupBy / Agg projection 
must fall through")
+                       }
+               })
        }
 }
 
@@ -290,18 +382,14 @@ func TestDispatch_Counters_TrackFellThroughCalls(t 
*testing.T) {
        startHandled := HandledCount()
        startFellThrough := FellThroughCount()
 
-       // Three fallthroughs of distinct shapes.
-       groupByReq := bareReq()
-       groupByReq.GroupBy = &measurev1.QueryRequest_GroupBy{
-               TagProjection: projTagProj(), FieldName: "value",
-       }
-       groupByReq.Agg = &measurev1.QueryRequest_Aggregation{
-               Function: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, 
FieldName: "value",
-       }
+       // Three fallthroughs of distinct shapes, each tripping a different
+       // gate so the counter is exercised across the eligibility branches.
+       topReq := bareReq()
+       topReq.Top = &measurev1.QueryRequest_Top{Number: 5, FieldName: "value"}
        noTimeReq := bareReq()
        noTimeReq.TimeRange = nil
 
-       for _, req := range []*measurev1.QueryRequest{groupByReq, noTimeReq, 
bareReq() /* nil ec */} {
+       for _, req := range []*measurev1.QueryRequest{topReq, noTimeReq, 
bareReq() /* nil ec */} {
                _, _, handled, dispatchErr := Dispatch(context.Background(),
                        req, nil, nil, nil, nil, dispatchCfg(true))
                if dispatchErr != nil {
@@ -320,6 +408,47 @@ func TestDispatch_Counters_TrackFellThroughCalls(t 
*testing.T) {
        }
 }
 
+// TestDispatch_GroupByAggCovered_ReachesEcQuery confirms G8d.2 has
+// opened the dispatch gate for GroupBy+Agg requests whose projection
+// covers both the GroupBy keys and the Agg field. The fakeEC returns
+// nil so dispatch falls through after ec.Query (matching the empty-
+// result branch) — what matters is that ec.Query was invoked at all,
+// proving the eligibility gate let the request through. With G8d.1
+// this would have been rejected before any ec interaction.
+func TestDispatch_GroupByAggCovered_ReachesEcQuery(t *testing.T) {
+       measureSchema := testMeasureSchema()
+       // nolint:staticcheck // SA1019 — row-path BuildSchema is the only 
schema builder until G8 replaces it.
+       logicalSchema, schemaErr := logicalmeasure.BuildSchema(measureSchema, 
nil)
+       if schemaErr != nil {
+               t.Fatalf("BuildSchema: %v", schemaErr)
+       }
+       metadata := &commonv1.Metadata{Name: "demo", Group: "default"}
+       ec := &fakeEC{wantResult: nil, wantErr: nil}
+
+       req := bareReq()
+       req.GroupBy = &measurev1.QueryRequest_GroupBy{
+               TagProjection: projTagProj(),
+               FieldName:     "value",
+       }
+       req.Agg = &measurev1.QueryRequest_Aggregation{
+               Function:  modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
+               FieldName: "value",
+       }
+
+       iter, planStr, handled, err := Dispatch(context.Background(),
+               req, metadata, measureSchema, logicalSchema, ec, 
dispatchCfgWithAgg())
+       if err != nil {
+               t.Fatalf("covered GroupBy+Agg must not error before ec.Query: 
%v", err)
+       }
+       if !ec.called {
+               // Surface the other return values for debugging when the gate
+               // regression resurfaces — they're all zero-valued today because
+               // ec.Query returned (nil, nil) and dispatch fell through.
+               t.Fatalf("covered GroupBy+Agg must reach ec.Query (G8d.2 gate); 
"+
+                       "got iter=%v planStr=%q handled=%v", iter, planStr, 
handled)
+       }
+}
+
 // TestDispatch_QueryError_BubblesUp covers the error propagation when
 // the storage query itself fails. Dispatch must report (nil, "", true,
 // err) so the caller surfaces the error rather than re-trying the row

Reply via email to