This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 32fb720a376d62a032a87c07e9f9daf509aa2f0e
Author: Hongtao Gao <[email protected]>
AuthorDate: Thu May 14 07:20:47 2026 +0000

    feat(query/vectorized/measure)!: remove AggregationEnabled flag — 
GroupBy+Agg dispatch is default-on
    
    The flag served its purpose: a kill-switch while the operator and bridge
    were being engineered toward row-path parity. With the integration parity
    gate at 488/488 specs green, the kill-switch is dead weight and the only
    remaining reason to keep it would be a code-path users could trip into
    that diverges from the row path. There isn't one.
    
    Drops:
      - VectorizedConfig.AggregationEnabled field + its plumbing
      - --measure-vectorized-aggregation-enabled CLI flag in banyand/measure
      - the cfg.AggregationEnabled gate in dispatch.Dispatch (lines 134-142)
      - TestDispatch_GroupByAgg_FlagOff_FallsThrough + the
        dispatchCfgWithAgg helper that pinned the flag's behavior
      - --measure-vectorized-aggregation-enabled=true from the parity gate
        standalone (the flag's the only place it was set)
      - the satisfied TODO(G8d) in executor_test.go that asked for the
        operator-pipeline bridge that has now landed
      - stale doc fragments in banyand/measure/query.go,
        dispatch.go's eligibility block, and dispatch_test.go that
        described the flag's existence
    
    Rollback path stays --measure-vectorized-enabled=false (kills the entire
    vec subsystem); GroupBy+Agg specifically can no longer be partially
    disabled.
    
    via [HAPI](https://hapi.run)
---
 banyand/measure/measure.go                         | 10 +---
 banyand/measure/query.go                           | 10 ++--
 pkg/query/vectorized/measure/config.go             | 14 ++---
 pkg/query/vectorized/measure/plan/dispatch.go      | 13 +----
 pkg/query/vectorized/measure/plan/dispatch_test.go | 59 +++++-----------------
 pkg/query/vectorized/measure/plan/executor_test.go | 13 -----
 .../standalone/query/vectorized_test.go            |  1 -
 7 files changed, 24 insertions(+), 96 deletions(-)

diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index a797156cc..76a8c1ae3 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -174,19 +174,13 @@ func (m *measure) VectorizedConfig() 
vmeasure.VectorizedConfig {
 }
 
 // bindVectorizedFlags wires VectorizedConfig fields to a run.FlagSet. The
-// defaults match vmeasure.DefaultConfig — Enabled=false, BatchSize=1024,
-// QueryMemoryMiB=256 — so config loaders that omit these flags retain the
-// pre-G4 row-only behavior.
+// defaults match vmeasure.DefaultConfig.
 func bindVectorizedFlags(flagS *run.FlagSet, cfg *vmeasure.VectorizedConfig) {
        defaults := vmeasure.DefaultConfig()
        flagS.BoolVar(&cfg.Enabled, "measure-vectorized-enabled", 
defaults.Enabled,
-               "enable the vectorized measure query path (off by default; flip 
after soak)")
+               "enable the vectorized measure query path")
        flagS.IntVar(&cfg.BatchSize, "measure-vectorized-batch-size", 
defaults.BatchSize,
                "row count per vectorized batch")
        flagS.IntVar(&cfg.QueryMemoryMiB, 
"measure-vectorized-query-memory-mib", defaults.QueryMemoryMiB,
                "per-query memory budget for the vectorized path, in MiB")
-       flagS.BoolVar(&cfg.AggregationEnabled, 
"measure-vectorized-aggregation-enabled",
-               defaults.AggregationEnabled,
-               "route GroupBy+Agg requests through the vectorized aggregation 
pipeline "+
-                       "instead of the row-path aggregator (default false; 
flip after egress parity is verified)")
 }
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 3d5f82ef1..6a442f743 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -228,12 +228,12 @@ func (m *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
        // projection. Falls back to nil on schema-build failure; PullBatch
        // checks for nil and returns a clean error rather than degrading
        // the row-path Pull().
-       // G8d.2: thread GroupBy + Agg through so BuildBatchSchema emits native
+       // Thread GroupBy + Agg through so BuildBatchSchema emits native
        // column types for the agg-relevant columns. Plain queries get
-       // passthrough as before; queries that the vec subsystem will fold
-       // over get native int64/float64/string/etc. columns ready for the
-       // BatchAggregation operator (computeKey + fold). The storage decoders
-       // (banyand/measure/batch_decode.go) already handle both column shapes.
+       // passthrough; GroupBy+Agg queries get native int64/float64/string
+       // columns the BatchAggregation operator reads directly (computeKey +
+       // fold). Storage decoders (banyand/measure/batch_decode.go) handle
+       // both column shapes.
        result.batchSchema, _ = vmeasure.BuildBatchSchema(m.schema, 
model.MeasureQueryOptions{
                TagProjection:   result.tagProjection,
                FieldProjection: mqo.FieldProjection,
diff --git a/pkg/query/vectorized/measure/config.go 
b/pkg/query/vectorized/measure/config.go
index df46b947f..6d0a4a7ae 100644
--- a/pkg/query/vectorized/measure/config.go
+++ b/pkg/query/vectorized/measure/config.go
@@ -24,20 +24,12 @@ type VectorizedConfig struct {
        BatchSize      int
        QueryMemoryMiB int
        Enabled        bool
-       // AggregationEnabled opens the dispatch gate to GroupBy+Agg requests
-       // (G8d.2 ships the schema/storage bridge but parity-preserving
-       // aggregation egress is still being engineered). Default false so
-       // production keeps routing GroupBy+Agg through the row path until
-       // the egress contract matches row-path output byte-for-byte.
-       AggregationEnabled bool
 }
 
 // DefaultConfig returns the v1 default — enabled, 1024-row batches, 256 MiB
-// per-query memory budget, aggregation gate off. v1 ships Enabled true post-
-// soak/bench-gate rollout (G5e); AggregationEnabled stays false until the
-// vec aggregation egress reaches row-path parity. To roll back the vec
-// path entirely, pass --measure-vectorized-enabled=false on the standalone
-// or data-node command line and restart.
+// per-query memory budget. To roll back the vec path entirely, pass
+// --measure-vectorized-enabled=false on the standalone or data-node command
+// line and restart.
 func DefaultConfig() VectorizedConfig {
        return VectorizedConfig{Enabled: true, BatchSize: 1024, QueryMemoryMiB: 
256}
 }
diff --git a/pkg/query/vectorized/measure/plan/dispatch.go 
b/pkg/query/vectorized/measure/plan/dispatch.go
index cffe1efe1..d1d07fc2f 100644
--- a/pkg/query/vectorized/measure/plan/dispatch.go
+++ b/pkg/query/vectorized/measure/plan/dispatch.go
@@ -86,8 +86,8 @@ func FellThroughCount() int64 { return 
fellThroughCount.Load() }
 //
 // Eligibility gate (v1):
 //   - cfg.Enabled must be true
-//   - request must NOT carry GroupBy or Agg (column-type bridging at the
-//     scan source still pending; see executor.go's TODO(G8d))
+//   - request may carry GroupBy+Agg as a pair (scalar reduce + raw
+//     GroupBy are deferred); aggProjectionCoverage must also hold
 //   - request must NOT carry Top (BatchTop's single-heap semantic differs
 //     from the row path's per-timestamp top-N)
 //   - request must carry TimeRange (storage requires a bounded window)
@@ -131,15 +131,6 @@ func Dispatch(
        hasGroupBy := req.GetGroupBy() != nil
        hasAgg := req.GetAgg() != nil
        if hasGroupBy || hasAgg {
-               // G8d.2 wires the schema/storage bridge (BuildBatchSchema emits
-               // native columns for GroupBy keys + Agg field, storage decoders
-               // honor it) so the operator pipeline is ready when the egress
-               // reaches row-path parity. Until then, AggregationEnabled gates
-               // the dispatch gate; default false keeps GroupBy+Agg on the row
-               // path so the parity suite stays green.
-               if !cfg.AggregationEnabled {
-                       return nil, "", false, nil
-               }
                // GroupBy and Agg must travel as a pair (scalar reduce + raw
                // groupby are deferred). Either alone falls through.
                if hasGroupBy != hasAgg {
diff --git a/pkg/query/vectorized/measure/plan/dispatch_test.go 
b/pkg/query/vectorized/measure/plan/dispatch_test.go
index c8a8a2532..d0c923e4b 100644
--- a/pkg/query/vectorized/measure/plan/dispatch_test.go
+++ b/pkg/query/vectorized/measure/plan/dispatch_test.go
@@ -36,14 +36,6 @@ func dispatchCfg(enabled bool) measure.VectorizedConfig {
        return measure.VectorizedConfig{Enabled: enabled, BatchSize: 1024, 
QueryMemoryMiB: 16}
 }
 
-// dispatchCfgWithAgg returns a vec-enabled config with the G8d.2
-// aggregation gate flipped on so dispatch handles GroupBy+Agg requests.
-func dispatchCfgWithAgg() measure.VectorizedConfig {
-       cfg := dispatchCfg(true)
-       cfg.AggregationEnabled = true
-       return cfg
-}
-
 func bareReq() *measurev1.QueryRequest {
        return &measurev1.QueryRequest{
                Name:            "demo",
@@ -73,34 +65,8 @@ func TestDispatch_NotEnabled_FallsThrough(t *testing.T) {
        }
 }
 
-// TestDispatch_GroupByAgg_FlagOff_FallsThrough pins the G8d.2 default:
-// even a fully-covered GroupBy+Agg request falls through when
-// AggregationEnabled is false (the default). This is the load-bearing
-// behavior that keeps the integration parity suite green while the
-// vec aggregation egress is engineered to match row-path output.
-func TestDispatch_GroupByAgg_FlagOff_FallsThrough(t *testing.T) {
-       req := bareReq()
-       req.GroupBy = &measurev1.QueryRequest_GroupBy{
-               TagProjection: projTagProj(),
-               FieldName:     "value",
-       }
-       req.Agg = &measurev1.QueryRequest_Aggregation{
-               Function:  modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
-               FieldName: "value",
-       }
-       _, _, handled, err := Dispatch(context.Background(),
-               req, nil, nil, nil, nil, dispatchCfg(true)) // 
AggregationEnabled = false (default)
-       if err != nil {
-               t.Fatalf("flag-off fallthrough must not error: %v", err)
-       }
-       if handled {
-               t.Fatal("GroupBy+Agg must fall through when AggregationEnabled 
is false")
-       }
-}
-
 // TestDispatch_GroupByWithoutAgg_FallsThrough covers the pair check
-// (GroupBy and Agg must travel together). Uses the agg-enabled config
-// so we reach the pair check rather than the flag-off gate.
+// (GroupBy and Agg must travel together).
 func TestDispatch_GroupByWithoutAgg_FallsThrough(t *testing.T) {
        req := bareReq()
        req.GroupBy = &measurev1.QueryRequest_GroupBy{
@@ -108,7 +74,7 @@ func TestDispatch_GroupByWithoutAgg_FallsThrough(t 
*testing.T) {
                FieldName:     "value",
        }
        _, _, handled, err := Dispatch(context.Background(),
-               req, nil, nil, nil, nil, dispatchCfgWithAgg())
+               req, nil, nil, nil, nil, dispatchCfg(true))
        if err != nil {
                t.Fatalf("GroupBy-without-Agg fallthrough must not error: %v", 
err)
        }
@@ -126,7 +92,7 @@ func TestDispatch_AggWithoutGroupBy_FallsThrough(t 
*testing.T) {
                FieldName: "value",
        }
        _, _, handled, err := Dispatch(context.Background(),
-               req, nil, nil, nil, nil, dispatchCfgWithAgg())
+               req, nil, nil, nil, nil, dispatchCfg(true))
        if err != nil {
                t.Fatalf("Agg-without-GroupBy fallthrough must not error: %v", 
err)
        }
@@ -185,7 +151,7 @@ func 
TestDispatch_GroupByAggUncoveredProjection_FallsThrough(t *testing.T) {
                        req := bareReq()
                        c.mutate(req)
                        _, _, handled, err := Dispatch(context.Background(),
-                               req, nil, nil, nil, nil, dispatchCfgWithAgg())
+                               req, nil, nil, nil, nil, dispatchCfg(true))
                        if err != nil {
                                t.Fatalf("uncovered projection must not error: 
%v", err)
                        }
@@ -408,13 +374,12 @@ func TestDispatch_Counters_TrackFellThroughCalls(t 
*testing.T) {
        }
 }
 
-// TestDispatch_GroupByAggCovered_ReachesEcQuery confirms G8d.2 has
-// opened the dispatch gate for GroupBy+Agg requests whose projection
-// covers both the GroupBy keys and the Agg field. The fakeEC returns
-// nil so dispatch falls through after ec.Query (matching the empty-
-// result branch) — what matters is that ec.Query was invoked at all,
-// proving the eligibility gate let the request through. With G8d.1
-// this would have been rejected before any ec interaction.
+// TestDispatch_GroupByAggCovered_ReachesEcQuery confirms the dispatch
+// gate admits GroupBy+Agg requests whose projection covers both the
+// GroupBy keys and the Agg field. fakeEC returns nil so dispatch falls
+// through after ec.Query (matching the empty-result branch) — what
+// matters is that ec.Query was invoked at all, proving the eligibility
+// gate let the request through.
 func TestDispatch_GroupByAggCovered_ReachesEcQuery(t *testing.T) {
        measureSchema := testMeasureSchema()
        // nolint:staticcheck // SA1019 — row-path BuildSchema is the only 
schema builder until G8 replaces it.
@@ -436,7 +401,7 @@ func TestDispatch_GroupByAggCovered_ReachesEcQuery(t 
*testing.T) {
        }
 
        iter, planStr, handled, err := Dispatch(context.Background(),
-               req, metadata, measureSchema, logicalSchema, ec, 
dispatchCfgWithAgg())
+               req, metadata, measureSchema, logicalSchema, ec, 
dispatchCfg(true))
        if err != nil {
                t.Fatalf("covered GroupBy+Agg must not error before ec.Query: 
%v", err)
        }
@@ -444,7 +409,7 @@ func TestDispatch_GroupByAggCovered_ReachesEcQuery(t 
*testing.T) {
                // Surface the other return values for debugging when the gate
                // regression resurfaces — they're all zero-valued today because
                // ec.Query returned (nil, nil) and dispatch fell through.
-               t.Fatalf("covered GroupBy+Agg must reach ec.Query (G8d.2 gate); 
"+
+               t.Fatalf("covered GroupBy+Agg must reach ec.Query (dispatch 
gate); "+
                        "got iter=%v planStr=%q handled=%v", iter, planStr, 
handled)
        }
 }
diff --git a/pkg/query/vectorized/measure/plan/executor_test.go 
b/pkg/query/vectorized/measure/plan/executor_test.go
index a0bd86d59..f2148874e 100644
--- a/pkg/query/vectorized/measure/plan/executor_test.go
+++ b/pkg/query/vectorized/measure/plan/executor_test.go
@@ -123,19 +123,6 @@ func 
TestExecute_GroupByAgg_EmitsAggregatedRowsWithNilTimestamp(t *testing.T) {
        }
 }
 
-// TODO(G8d): an end-to-end Analyze+Execute test against the analyzer-
-// produced BatchSchema would exercise the agg pipeline against the same
-// column layout the scan source emits in production. Today
-// BuildBatchSchema uses passthrough *modelv1.FieldValue columns (to keep
-// row-path egress zero-alloc), but BatchAggregation.fold requires native
-// int64/float64. G8d must either:
-//   - convert passthrough → native via a fusible at the scan-source
-//     boundary when the pipeline contains a fold operator, or
-//   - teach BatchAggregation to fold over *modelv1.FieldValue.
-// Until that bridge lands, the executor tests above use a hand-built
-// schema with native typed columns to verify executor correctness in
-// isolation.
-
 func TestExecute_BuildError_SurfacesAsExecuteError(t *testing.T) {
        schema, _ := buildScanInput(t)
        scan := NewScan(schema, ScanParams{}) // Source intentionally unset
diff --git a/test/integration/standalone/query/vectorized_test.go 
b/test/integration/standalone/query/vectorized_test.go
index 5a974ddf7..abd037352 100644
--- a/test/integration/standalone/query/vectorized_test.go
+++ b/test/integration/standalone/query/vectorized_test.go
@@ -89,7 +89,6 @@ var _ = ginkgo.Describe("vectorized parity", ginkgo.Ordered, 
func() {
                config := setup.PropertyClusterConfig(dfWriter)
                addr, _, closeFn := setup.ClosableStandalone(config, path, 
ports,
                        "--measure-vectorized-enabled=true",
-                       "--measure-vectorized-aggregation-enabled=true",
                )
                stopFn = func() {
                        closeFn()

Reply via email to