This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 7c0d9259f9c7390d920db8c68463f2e21df292b7 Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 13 23:23:13 2026 +0000 fix(query/vectorized/measure): agg output column inherits input field name for row-path parity (G8d.3 partial) The row-path aggregator (aggGroupIterator.Current() in pkg/query/logical/measure/measure_plan_aggregation.go) emits a single DataPoint_Field per group whose Name is the ORIGINAL input field name (e.g. "value"). BatchAggregation was emitting an auto-derived "<field>_<func>" name (e.g. "value_sum"), which broke proto.Equal parity in the integration suite — 8 of 9 GroupBy+Agg test cases failed purely on the field-name diff. Drop the auto-derivation in pkg/query/vectorized/measure/plan.go's BuildOperators and pass opts.Agg.FieldName directly as AggSpec.Output. The aggregation function lives on the operator spec, not in the column name; nothing about the output schema construction otherwise relies on the suffix being present. Test updates: - TestBuildOperators_AggOutputName_IsFieldUnderscoreFunc was pinning the wrong contract; renamed to TestBuildOperators_AggOutputName_ InheritsInputFieldName and re-purposed to assert "value" for every AggFunc (the row-path parity invariant). - build_test.go's TestGroupByAgg_Schema_DropsTimestampAddsAggField and executor_test.go's TestExecute_GroupByAgg_EmitsAggregatedRows_ WithNilTimestamp updated their Field.Name expectations from "value_sum" to "value". Verified with --measure-vectorized-aggregation-enabled=true in the parity standalone: 8 of 9 GroupBy+Agg cases now pass byte-for-byte. The remaining failure is "group and max" (timeout, not a value mismatch — distinct cause), tracked for G8d.3 follow-up. The vec parity test in this commit keeps the flag OFF until that one closes so the gate stays green. --- pkg/query/vectorized/measure/plan.go | 28 +++++----------- pkg/query/vectorized/measure/plan/build_test.go | 6 ++-- pkg/query/vectorized/measure/plan/executor_test.go | 4 +-- pkg/query/vectorized/measure/plan_test.go | 38 ++++++++++++---------- .../standalone/query/vectorized_test.go | 5 +++ 5 files changed, 40 insertions(+), 41 deletions(-) diff --git a/pkg/query/vectorized/measure/plan.go b/pkg/query/vectorized/measure/plan.go index 1cd0098ad..0df501bf9 100644 --- a/pkg/query/vectorized/measure/plan.go +++ b/pkg/query/vectorized/measure/plan.go @@ -19,7 +19,6 @@ package measure import ( "fmt" - "strings" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/query/model" @@ -86,10 +85,18 @@ func BuildOperators( return nil, fnErr } + // The agg result column inherits the input field's name to match the + // row-path aggregator (aggGroupIterator.Current() in + // pkg/query/logical/measure/measure_plan_aggregation.go). Row-path + // fixtures expect a single output field named after the original + // input (e.g. "value"), not an auto-derived "<field>_<func>" suffix + // like "value_sum" — the suffix would break proto.Equal parity in the + // integration suite. The aggregation function lives on the operator + // spec, not in the column name. spec := AggSpec{ Func: aggFn, InputCol: fieldIdx, - Output: aggOutputName(opts.Agg.FieldName, aggFn), + Output: opts.Agg.FieldName, } agg := NewBatchAggregation(schema, keyIndices, []AggSpec{spec}, AggModeAll, batchSize, tracker, aggEntrySize) @@ -148,20 +155,3 @@ func protoAggFuncToInternal(f modelv1.AggregationFunction) (AggFunc, error) { return 0, fmt.Errorf("vectorized.measure: unknown AggregationFunction %v", f) } -// aggOutputName derives the agg result column name: <field>_<func> (lowercase). -func aggOutputName(fieldName string, fn AggFunc) string { - suffix := "" - switch fn { - case AggSum: - suffix = "sum" - case AggCount: - suffix = "count" - case AggMin: - suffix = "min" - case AggMax: - suffix = "max" - case AggMean: - suffix = "mean" - } - return strings.Join([]string{fieldName, suffix}, "_") -} diff --git a/pkg/query/vectorized/measure/plan/build_test.go b/pkg/query/vectorized/measure/plan/build_test.go index e04a33d82..88fc05a11 100644 --- a/pkg/query/vectorized/measure/plan/build_test.go +++ b/pkg/query/vectorized/measure/plan/build_test.go @@ -218,14 +218,14 @@ func TestGroupByAgg_Schema_DropsTimestampAddsAggField(t *testing.T) { if out.TimestampIndex() >= 0 { t.Fatalf("aggregation output must drop timestamp (D2); got index %d", out.TimestampIndex()) } - // 2 columns: svc key + value_sum agg result. + // 2 columns: svc key + value agg result (row-path parity name). if len(out.Columns) != 2 { t.Fatalf("want 2 output columns (key + agg result), got %d", len(out.Columns)) } if out.Columns[0].Name != "svc" { t.Fatalf("col 0 should be the svc key, got %s", out.Columns[0].Name) } - if out.Columns[1].Name != "value_sum" { - t.Fatalf("col 1 should be value_sum, got %s", out.Columns[1].Name) + if out.Columns[1].Name != "value" { + t.Fatalf("col 1 should be 'value' (row-path parity), got %s", out.Columns[1].Name) } } diff --git a/pkg/query/vectorized/measure/plan/executor_test.go b/pkg/query/vectorized/measure/plan/executor_test.go index 81e170ab5..a0bd86d59 100644 --- a/pkg/query/vectorized/measure/plan/executor_test.go +++ b/pkg/query/vectorized/measure/plan/executor_test.go @@ -110,8 +110,8 @@ func TestExecute_GroupByAgg_EmitsAggregatedRowsWithNilTimestamp(t *testing.T) { t.Fatalf("want one Tag 'svc', got %+v", tags) } svc := tags[0].Value.GetStr().GetValue() - if len(idp.DataPoint.Fields) != 1 || idp.DataPoint.Fields[0].Name != "value_sum" { - t.Fatalf("want one Field 'value_sum', got %+v", idp.DataPoint.Fields) + if len(idp.DataPoint.Fields) != 1 || idp.DataPoint.Fields[0].Name != "value" { + t.Fatalf("want one Field 'value' (row-path parity), got %+v", idp.DataPoint.Fields) } bySvc[svc] = idp.DataPoint.Fields[0].Value.GetInt().GetValue() } diff --git a/pkg/query/vectorized/measure/plan_test.go b/pkg/query/vectorized/measure/plan_test.go index 0e9ff8370..ed4b7e4ea 100644 --- a/pkg/query/vectorized/measure/plan_test.go +++ b/pkg/query/vectorized/measure/plan_test.go @@ -63,31 +63,35 @@ func TestBuildOperators_GroupByPlusAgg_EmitsBatchAggregation(t *testing.T) { } } -func TestBuildOperators_AggOutputName_IsFieldUnderscoreFunc(t *testing.T) { - cases := []struct { - want string - fn modelv1.AggregationFunction - }{ - {"value_sum", modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, - {"value_count", modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT}, - {"value_min", modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN}, - {"value_max", modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX}, - {"value_mean", modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN}, - } - for _, c := range cases { +// TestBuildOperators_AggOutputName_InheritsInputFieldName pins the +// G8d.2 row-path-parity name: the agg result column reuses the input +// field name (e.g. "value") for every AggFunc, matching the row-path +// aggGroupIterator.Current() that emits a single DataPoint_Field named +// after the original input field. Any auto-derived "<field>_<func>" +// suffix would break proto.Equal parity in the integration suite. +func TestBuildOperators_AggOutputName_InheritsInputFieldName(t *testing.T) { + fns := []modelv1.AggregationFunction{ + modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, + modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT, + modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN, + modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX, + modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN, + } + const wantName = "value" + for _, fn := range fns { opts := model.MeasureQueryOptions{ GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: []string{"svc"}}, - Agg: &model.MeasureAgg{FieldName: "value", Func: c.fn}, + Agg: &model.MeasureAgg{FieldName: "value", Func: fn}, } ops, err := BuildOperators(opts, planSchema(), vectorized.NewMemoryTracker(1<<20), 1024) if err != nil { - t.Fatalf("%v: BuildOperators error: %v", c.fn, err) + t.Fatalf("%v: BuildOperators error: %v", fn, err) } agg := ops[0].(*BatchAggregation) - // Output schema's last column is the agg result; its Name is the auto-derived output name. + // Output schema layout: key columns then the agg result column. got := agg.OutputSchema().Columns[len(agg.OutputSchema().Columns)-1].Name - if got != c.want { - t.Fatalf("%v: want output column name %q, got %q", c.fn, c.want, got) + if got != wantName { + t.Fatalf("%v: want output column name %q (row-path parity), got %q", fn, wantName, got) } } } diff --git a/test/integration/standalone/query/vectorized_test.go b/test/integration/standalone/query/vectorized_test.go index abd037352..f577b7fac 100644 --- a/test/integration/standalone/query/vectorized_test.go +++ b/test/integration/standalone/query/vectorized_test.go @@ -89,6 +89,11 @@ var _ = ginkgo.Describe("vectorized parity", ginkgo.Ordered, func() { config := setup.PropertyClusterConfig(dfWriter) addr, _, closeFn := setup.ClosableStandalone(config, path, ports, "--measure-vectorized-enabled=true", + // G8d.3 will add --measure-vectorized-aggregation-enabled=true + // here once the MAX-specific egress parity gap is closed. + // 8 of 9 GroupBy+Agg cases already pass with the gate on; the + // remaining failure (group_max timeout) is tracked separately + // before this flag flips in the parity gate too. ) stopFn = func() { closeFn()
