This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 32fb720a376d62a032a87c07e9f9daf509aa2f0e Author: Hongtao Gao <[email protected]> AuthorDate: Thu May 14 07:20:47 2026 +0000 feat(query/vectorized/measure)!: remove AggregationEnabled flag — GroupBy+Agg dispatch is default-on The flag served its purpose: a kill-switch while the operator and bridge were being engineered toward row-path parity. With the integration parity gate at 488/488 specs green, the kill-switch is dead weight and the only remaining reason to keep it would be a code-path users could trip into that diverges from the row path. There isn't one. Drops: - VectorizedConfig.AggregationEnabled field + its plumbing - --measure-vectorized-aggregation-enabled CLI flag in banyand/measure - the cfg.AggregationEnabled gate in dispatch.Dispatch (lines 134-142) - TestDispatch_GroupByAgg_FlagOff_FallsThrough + the dispatchCfgWithAgg helper that pinned the flag's behavior - --measure-vectorized-aggregation-enabled=true from the parity gate standalone (the flag's the only place it was set) - the satisfied TODO(G8d) in executor_test.go that asked for the operator-pipeline bridge that has now landed - stale doc fragments in banyand/measure/query.go, dispatch.go's eligibility block, and dispatch_test.go that described the flag's existence Rollback path stays --measure-vectorized-enabled=false (kills the entire vec subsystem); GroupBy+Agg specifically can no longer be partially disabled. via [HAPI](https://hapi.run) --- banyand/measure/measure.go | 10 +--- banyand/measure/query.go | 10 ++-- pkg/query/vectorized/measure/config.go | 14 ++--- pkg/query/vectorized/measure/plan/dispatch.go | 13 +---- pkg/query/vectorized/measure/plan/dispatch_test.go | 59 +++++----------------- pkg/query/vectorized/measure/plan/executor_test.go | 13 ----- .../standalone/query/vectorized_test.go | 1 - 7 files changed, 24 insertions(+), 96 deletions(-) diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go index a797156cc..76a8c1ae3 100644 --- a/banyand/measure/measure.go +++ b/banyand/measure/measure.go @@ -174,19 +174,13 @@ func (m *measure) VectorizedConfig() vmeasure.VectorizedConfig { } // bindVectorizedFlags wires VectorizedConfig fields to a run.FlagSet. The -// defaults match vmeasure.DefaultConfig — Enabled=false, BatchSize=1024, -// QueryMemoryMiB=256 — so config loaders that omit these flags retain the -// pre-G4 row-only behavior. +// defaults match vmeasure.DefaultConfig. func bindVectorizedFlags(flagS *run.FlagSet, cfg *vmeasure.VectorizedConfig) { defaults := vmeasure.DefaultConfig() flagS.BoolVar(&cfg.Enabled, "measure-vectorized-enabled", defaults.Enabled, - "enable the vectorized measure query path (off by default; flip after soak)") + "enable the vectorized measure query path") flagS.IntVar(&cfg.BatchSize, "measure-vectorized-batch-size", defaults.BatchSize, "row count per vectorized batch") flagS.IntVar(&cfg.QueryMemoryMiB, "measure-vectorized-query-memory-mib", defaults.QueryMemoryMiB, "per-query memory budget for the vectorized path, in MiB") - flagS.BoolVar(&cfg.AggregationEnabled, "measure-vectorized-aggregation-enabled", - defaults.AggregationEnabled, - "route GroupBy+Agg requests through the vectorized aggregation pipeline "+ - "instead of the row-path aggregator (default false; flip after egress parity is verified)") } diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 3d5f82ef1..6a442f743 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -228,12 +228,12 @@ func (m *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr // projection. Falls back to nil on schema-build failure; PullBatch // checks for nil and returns a clean error rather than degrading // the row-path Pull(). - // G8d.2: thread GroupBy + Agg through so BuildBatchSchema emits native + // Thread GroupBy + Agg through so BuildBatchSchema emits native // column types for the agg-relevant columns. Plain queries get - // passthrough as before; queries that the vec subsystem will fold - // over get native int64/float64/string/etc. columns ready for the - // BatchAggregation operator (computeKey + fold). The storage decoders - // (banyand/measure/batch_decode.go) already handle both column shapes. + // passthrough; GroupBy+Agg queries get native int64/float64/string + // columns the BatchAggregation operator reads directly (computeKey + + // fold). Storage decoders (banyand/measure/batch_decode.go) handle + // both column shapes. result.batchSchema, _ = vmeasure.BuildBatchSchema(m.schema, model.MeasureQueryOptions{ TagProjection: result.tagProjection, FieldProjection: mqo.FieldProjection, diff --git a/pkg/query/vectorized/measure/config.go b/pkg/query/vectorized/measure/config.go index df46b947f..6d0a4a7ae 100644 --- a/pkg/query/vectorized/measure/config.go +++ b/pkg/query/vectorized/measure/config.go @@ -24,20 +24,12 @@ type VectorizedConfig struct { BatchSize int QueryMemoryMiB int Enabled bool - // AggregationEnabled opens the dispatch gate to GroupBy+Agg requests - // (G8d.2 ships the schema/storage bridge but parity-preserving - // aggregation egress is still being engineered). Default false so - // production keeps routing GroupBy+Agg through the row path until - // the egress contract matches row-path output byte-for-byte. - AggregationEnabled bool } // DefaultConfig returns the v1 default — enabled, 1024-row batches, 256 MiB -// per-query memory budget, aggregation gate off. v1 ships Enabled true post- -// soak/bench-gate rollout (G5e); AggregationEnabled stays false until the -// vec aggregation egress reaches row-path parity. To roll back the vec -// path entirely, pass --measure-vectorized-enabled=false on the standalone -// or data-node command line and restart. +// per-query memory budget. To roll back the vec path entirely, pass +// --measure-vectorized-enabled=false on the standalone or data-node command +// line and restart. func DefaultConfig() VectorizedConfig { return VectorizedConfig{Enabled: true, BatchSize: 1024, QueryMemoryMiB: 256} } diff --git a/pkg/query/vectorized/measure/plan/dispatch.go b/pkg/query/vectorized/measure/plan/dispatch.go index cffe1efe1..d1d07fc2f 100644 --- a/pkg/query/vectorized/measure/plan/dispatch.go +++ b/pkg/query/vectorized/measure/plan/dispatch.go @@ -86,8 +86,8 @@ func FellThroughCount() int64 { return fellThroughCount.Load() } // // Eligibility gate (v1): // - cfg.Enabled must be true -// - request must NOT carry GroupBy or Agg (column-type bridging at the -// scan source still pending; see executor.go's TODO(G8d)) +// - request may carry GroupBy+Agg as a pair (scalar reduce + raw +// GroupBy are deferred); aggProjectionCoverage must also hold // - request must NOT carry Top (BatchTop's single-heap semantic differs // from the row path's per-timestamp top-N) // - request must carry TimeRange (storage requires a bounded window) @@ -131,15 +131,6 @@ func Dispatch( hasGroupBy := req.GetGroupBy() != nil hasAgg := req.GetAgg() != nil if hasGroupBy || hasAgg { - // G8d.2 wires the schema/storage bridge (BuildBatchSchema emits - // native columns for GroupBy keys + Agg field, storage decoders - // honor it) so the operator pipeline is ready when the egress - // reaches row-path parity. Until then, AggregationEnabled gates - // the dispatch gate; default false keeps GroupBy+Agg on the row - // path so the parity suite stays green. - if !cfg.AggregationEnabled { - return nil, "", false, nil - } // GroupBy and Agg must travel as a pair (scalar reduce + raw // groupby are deferred). Either alone falls through. if hasGroupBy != hasAgg { diff --git a/pkg/query/vectorized/measure/plan/dispatch_test.go b/pkg/query/vectorized/measure/plan/dispatch_test.go index c8a8a2532..d0c923e4b 100644 --- a/pkg/query/vectorized/measure/plan/dispatch_test.go +++ b/pkg/query/vectorized/measure/plan/dispatch_test.go @@ -36,14 +36,6 @@ func dispatchCfg(enabled bool) measure.VectorizedConfig { return measure.VectorizedConfig{Enabled: enabled, BatchSize: 1024, QueryMemoryMiB: 16} } -// dispatchCfgWithAgg returns a vec-enabled config with the G8d.2 -// aggregation gate flipped on so dispatch handles GroupBy+Agg requests. -func dispatchCfgWithAgg() measure.VectorizedConfig { - cfg := dispatchCfg(true) - cfg.AggregationEnabled = true - return cfg -} - func bareReq() *measurev1.QueryRequest { return &measurev1.QueryRequest{ Name: "demo", @@ -73,34 +65,8 @@ func TestDispatch_NotEnabled_FallsThrough(t *testing.T) { } } -// TestDispatch_GroupByAgg_FlagOff_FallsThrough pins the G8d.2 default: -// even a fully-covered GroupBy+Agg request falls through when -// AggregationEnabled is false (the default). This is the load-bearing -// behavior that keeps the integration parity suite green while the -// vec aggregation egress is engineered to match row-path output. -func TestDispatch_GroupByAgg_FlagOff_FallsThrough(t *testing.T) { - req := bareReq() - req.GroupBy = &measurev1.QueryRequest_GroupBy{ - TagProjection: projTagProj(), - FieldName: "value", - } - req.Agg = &measurev1.QueryRequest_Aggregation{ - Function: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, - FieldName: "value", - } - _, _, handled, err := Dispatch(context.Background(), - req, nil, nil, nil, nil, dispatchCfg(true)) // AggregationEnabled = false (default) - if err != nil { - t.Fatalf("flag-off fallthrough must not error: %v", err) - } - if handled { - t.Fatal("GroupBy+Agg must fall through when AggregationEnabled is false") - } -} - // TestDispatch_GroupByWithoutAgg_FallsThrough covers the pair check -// (GroupBy and Agg must travel together). Uses the agg-enabled config -// so we reach the pair check rather than the flag-off gate. +// (GroupBy and Agg must travel together). func TestDispatch_GroupByWithoutAgg_FallsThrough(t *testing.T) { req := bareReq() req.GroupBy = &measurev1.QueryRequest_GroupBy{ @@ -108,7 +74,7 @@ func TestDispatch_GroupByWithoutAgg_FallsThrough(t *testing.T) { FieldName: "value", } _, _, handled, err := Dispatch(context.Background(), - req, nil, nil, nil, nil, dispatchCfgWithAgg()) + req, nil, nil, nil, nil, dispatchCfg(true)) if err != nil { t.Fatalf("GroupBy-without-Agg fallthrough must not error: %v", err) } @@ -126,7 +92,7 @@ func TestDispatch_AggWithoutGroupBy_FallsThrough(t *testing.T) { FieldName: "value", } _, _, handled, err := Dispatch(context.Background(), - req, nil, nil, nil, nil, dispatchCfgWithAgg()) + req, nil, nil, nil, nil, dispatchCfg(true)) if err != nil { t.Fatalf("Agg-without-GroupBy fallthrough must not error: %v", err) } @@ -185,7 +151,7 @@ func TestDispatch_GroupByAggUncoveredProjection_FallsThrough(t *testing.T) { req := bareReq() c.mutate(req) _, _, handled, err := Dispatch(context.Background(), - req, nil, nil, nil, nil, dispatchCfgWithAgg()) + req, nil, nil, nil, nil, dispatchCfg(true)) if err != nil { t.Fatalf("uncovered projection must not error: %v", err) } @@ -408,13 +374,12 @@ func TestDispatch_Counters_TrackFellThroughCalls(t *testing.T) { } } -// TestDispatch_GroupByAggCovered_ReachesEcQuery confirms G8d.2 has -// opened the dispatch gate for GroupBy+Agg requests whose projection -// covers both the GroupBy keys and the Agg field. The fakeEC returns -// nil so dispatch falls through after ec.Query (matching the empty- -// result branch) — what matters is that ec.Query was invoked at all, -// proving the eligibility gate let the request through. With G8d.1 -// this would have been rejected before any ec interaction. +// TestDispatch_GroupByAggCovered_ReachesEcQuery confirms the dispatch +// gate admits GroupBy+Agg requests whose projection covers both the +// GroupBy keys and the Agg field. fakeEC returns nil so dispatch falls +// through after ec.Query (matching the empty-result branch) — what +// matters is that ec.Query was invoked at all, proving the eligibility +// gate let the request through. func TestDispatch_GroupByAggCovered_ReachesEcQuery(t *testing.T) { measureSchema := testMeasureSchema() // nolint:staticcheck // SA1019 — row-path BuildSchema is the only schema builder until G8 replaces it. @@ -436,7 +401,7 @@ func TestDispatch_GroupByAggCovered_ReachesEcQuery(t *testing.T) { } iter, planStr, handled, err := Dispatch(context.Background(), - req, metadata, measureSchema, logicalSchema, ec, dispatchCfgWithAgg()) + req, metadata, measureSchema, logicalSchema, ec, dispatchCfg(true)) if err != nil { t.Fatalf("covered GroupBy+Agg must not error before ec.Query: %v", err) } @@ -444,7 +409,7 @@ func TestDispatch_GroupByAggCovered_ReachesEcQuery(t *testing.T) { // Surface the other return values for debugging when the gate // regression resurfaces — they're all zero-valued today because // ec.Query returned (nil, nil) and dispatch fell through. - t.Fatalf("covered GroupBy+Agg must reach ec.Query (G8d.2 gate); "+ + t.Fatalf("covered GroupBy+Agg must reach ec.Query (dispatch gate); "+ "got iter=%v planStr=%q handled=%v", iter, planStr, handled) } } diff --git a/pkg/query/vectorized/measure/plan/executor_test.go b/pkg/query/vectorized/measure/plan/executor_test.go index a0bd86d59..f2148874e 100644 --- a/pkg/query/vectorized/measure/plan/executor_test.go +++ b/pkg/query/vectorized/measure/plan/executor_test.go @@ -123,19 +123,6 @@ func TestExecute_GroupByAgg_EmitsAggregatedRowsWithNilTimestamp(t *testing.T) { } } -// TODO(G8d): an end-to-end Analyze+Execute test against the analyzer- -// produced BatchSchema would exercise the agg pipeline against the same -// column layout the scan source emits in production. Today -// BuildBatchSchema uses passthrough *modelv1.FieldValue columns (to keep -// row-path egress zero-alloc), but BatchAggregation.fold requires native -// int64/float64. G8d must either: -// - convert passthrough → native via a fusible at the scan-source -// boundary when the pipeline contains a fold operator, or -// - teach BatchAggregation to fold over *modelv1.FieldValue. -// Until that bridge lands, the executor tests above use a hand-built -// schema with native typed columns to verify executor correctness in -// isolation. - func TestExecute_BuildError_SurfacesAsExecuteError(t *testing.T) { schema, _ := buildScanInput(t) scan := NewScan(schema, ScanParams{}) // Source intentionally unset diff --git a/test/integration/standalone/query/vectorized_test.go b/test/integration/standalone/query/vectorized_test.go index 5a974ddf7..abd037352 100644 --- a/test/integration/standalone/query/vectorized_test.go +++ b/test/integration/standalone/query/vectorized_test.go @@ -89,7 +89,6 @@ var _ = ginkgo.Describe("vectorized parity", ginkgo.Ordered, func() { config := setup.PropertyClusterConfig(dfWriter) addr, _, closeFn := setup.ClosableStandalone(config, path, ports, "--measure-vectorized-enabled=true", - "--measure-vectorized-aggregation-enabled=true", ) stopFn = func() { closeFn()
