This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 7c0d9259f9c7390d920db8c68463f2e21df292b7
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 13 23:23:13 2026 +0000

    fix(query/vectorized/measure): agg output column inherits input field name 
for row-path parity (G8d.3 partial)
    
    The row-path aggregator (aggGroupIterator.Current() in
    pkg/query/logical/measure/measure_plan_aggregation.go) emits a single
    DataPoint_Field per group whose Name is the ORIGINAL input field name
    (e.g. "value"). BatchAggregation was emitting an auto-derived
    "<field>_<func>" name (e.g. "value_sum"), which broke proto.Equal
    parity in the integration suite — 8 of 9 GroupBy+Agg test cases failed
    purely on the field-name diff.
    
    Drop the auto-derivation in pkg/query/vectorized/measure/plan.go's
    BuildOperators and pass opts.Agg.FieldName directly as AggSpec.Output.
    The aggregation function lives on the operator spec, not in the
    column name; nothing about the output schema construction otherwise
    relies on the suffix being present.
    
    Test updates:
    
    - TestBuildOperators_AggOutputName_IsFieldUnderscoreFunc was pinning
      the wrong contract; renamed to TestBuildOperators_AggOutputName_
      InheritsInputFieldName and re-purposed to assert "value" for every
      AggFunc (the row-path parity invariant).
    - build_test.go's TestGroupByAgg_Schema_DropsTimestampAddsAggField and
      executor_test.go's TestExecute_GroupByAgg_EmitsAggregatedRows_
      WithNilTimestamp updated their Field.Name expectations from
      "value_sum" to "value".
    
    Verified with --measure-vectorized-aggregation-enabled=true in the
    parity standalone: 8 of 9 GroupBy+Agg cases now pass byte-for-byte.
    The remaining failure is "group and max" (timeout, not a value
    mismatch — distinct cause), tracked for G8d.3 follow-up. The vec
    parity test in this commit keeps the flag OFF until that one closes
    so the gate stays green.
---
 pkg/query/vectorized/measure/plan.go               | 28 +++++-----------
 pkg/query/vectorized/measure/plan/build_test.go    |  6 ++--
 pkg/query/vectorized/measure/plan/executor_test.go |  4 +--
 pkg/query/vectorized/measure/plan_test.go          | 38 ++++++++++++----------
 .../standalone/query/vectorized_test.go            |  5 +++
 5 files changed, 40 insertions(+), 41 deletions(-)

diff --git a/pkg/query/vectorized/measure/plan.go 
b/pkg/query/vectorized/measure/plan.go
index 1cd0098ad..0df501bf9 100644
--- a/pkg/query/vectorized/measure/plan.go
+++ b/pkg/query/vectorized/measure/plan.go
@@ -19,7 +19,6 @@ package measure
 
 import (
        "fmt"
-       "strings"
 
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
@@ -86,10 +85,18 @@ func BuildOperators(
                return nil, fnErr
        }
 
+       // The agg result column inherits the input field's name to match the
+       // row-path aggregator (aggGroupIterator.Current() in
+       // pkg/query/logical/measure/measure_plan_aggregation.go). Row-path
+       // fixtures expect a single output field named after the original
+       // input (e.g. "value"), not an auto-derived "<field>_<func>" suffix
+       // like "value_sum" — the suffix would break proto.Equal parity in the
+       // integration suite. The aggregation function lives on the operator
+       // spec, not in the column name.
        spec := AggSpec{
                Func:     aggFn,
                InputCol: fieldIdx,
-               Output:   aggOutputName(opts.Agg.FieldName, aggFn),
+               Output:   opts.Agg.FieldName,
        }
        agg := NewBatchAggregation(schema, keyIndices, []AggSpec{spec},
                AggModeAll, batchSize, tracker, aggEntrySize)
@@ -148,20 +155,3 @@ func protoAggFuncToInternal(f modelv1.AggregationFunction) 
(AggFunc, error) {
        return 0, fmt.Errorf("vectorized.measure: unknown AggregationFunction 
%v", f)
 }
 
-// aggOutputName derives the agg result column name: <field>_<func> 
(lowercase).
-func aggOutputName(fieldName string, fn AggFunc) string {
-       suffix := ""
-       switch fn {
-       case AggSum:
-               suffix = "sum"
-       case AggCount:
-               suffix = "count"
-       case AggMin:
-               suffix = "min"
-       case AggMax:
-               suffix = "max"
-       case AggMean:
-               suffix = "mean"
-       }
-       return strings.Join([]string{fieldName, suffix}, "_")
-}
diff --git a/pkg/query/vectorized/measure/plan/build_test.go 
b/pkg/query/vectorized/measure/plan/build_test.go
index e04a33d82..88fc05a11 100644
--- a/pkg/query/vectorized/measure/plan/build_test.go
+++ b/pkg/query/vectorized/measure/plan/build_test.go
@@ -218,14 +218,14 @@ func TestGroupByAgg_Schema_DropsTimestampAddsAggField(t 
*testing.T) {
        if out.TimestampIndex() >= 0 {
                t.Fatalf("aggregation output must drop timestamp (D2); got 
index %d", out.TimestampIndex())
        }
-       // 2 columns: svc key + value_sum agg result.
+       // 2 columns: svc key + value agg result (row-path parity name).
        if len(out.Columns) != 2 {
                t.Fatalf("want 2 output columns (key + agg result), got %d", 
len(out.Columns))
        }
        if out.Columns[0].Name != "svc" {
                t.Fatalf("col 0 should be the svc key, got %s", 
out.Columns[0].Name)
        }
-       if out.Columns[1].Name != "value_sum" {
-               t.Fatalf("col 1 should be value_sum, got %s", 
out.Columns[1].Name)
+       if out.Columns[1].Name != "value" {
+               t.Fatalf("col 1 should be 'value' (row-path parity), got %s", 
out.Columns[1].Name)
        }
 }
diff --git a/pkg/query/vectorized/measure/plan/executor_test.go 
b/pkg/query/vectorized/measure/plan/executor_test.go
index 81e170ab5..a0bd86d59 100644
--- a/pkg/query/vectorized/measure/plan/executor_test.go
+++ b/pkg/query/vectorized/measure/plan/executor_test.go
@@ -110,8 +110,8 @@ func 
TestExecute_GroupByAgg_EmitsAggregatedRowsWithNilTimestamp(t *testing.T) {
                        t.Fatalf("want one Tag 'svc', got %+v", tags)
                }
                svc := tags[0].Value.GetStr().GetValue()
-               if len(idp.DataPoint.Fields) != 1 || 
idp.DataPoint.Fields[0].Name != "value_sum" {
-                       t.Fatalf("want one Field 'value_sum', got %+v", 
idp.DataPoint.Fields)
+               if len(idp.DataPoint.Fields) != 1 || 
idp.DataPoint.Fields[0].Name != "value" {
+                       t.Fatalf("want one Field 'value' (row-path parity), got 
%+v", idp.DataPoint.Fields)
                }
                bySvc[svc] = idp.DataPoint.Fields[0].Value.GetInt().GetValue()
        }
diff --git a/pkg/query/vectorized/measure/plan_test.go 
b/pkg/query/vectorized/measure/plan_test.go
index 0e9ff8370..ed4b7e4ea 100644
--- a/pkg/query/vectorized/measure/plan_test.go
+++ b/pkg/query/vectorized/measure/plan_test.go
@@ -63,31 +63,35 @@ func 
TestBuildOperators_GroupByPlusAgg_EmitsBatchAggregation(t *testing.T) {
        }
 }
 
-func TestBuildOperators_AggOutputName_IsFieldUnderscoreFunc(t *testing.T) {
-       cases := []struct {
-               want string
-               fn   modelv1.AggregationFunction
-       }{
-               {"value_sum", 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM},
-               {"value_count", 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT},
-               {"value_min", 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN},
-               {"value_max", 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX},
-               {"value_mean", 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN},
-       }
-       for _, c := range cases {
+// TestBuildOperators_AggOutputName_InheritsInputFieldName pins the
+// G8d.2 row-path-parity name: the agg result column reuses the input
+// field name (e.g. "value") for every AggFunc, matching the row-path
+// aggGroupIterator.Current() that emits a single DataPoint_Field named
+// after the original input field. Any auto-derived "<field>_<func>"
+// suffix would break proto.Equal parity in the integration suite.
+func TestBuildOperators_AggOutputName_InheritsInputFieldName(t *testing.T) {
+       fns := []modelv1.AggregationFunction{
+               modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
+               modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT,
+               modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN,
+               modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX,
+               modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN,
+       }
+       const wantName = "value"
+       for _, fn := range fns {
                opts := model.MeasureQueryOptions{
                        GroupBy: &model.MeasureGroupBy{TagFamily: "default", 
TagNames: []string{"svc"}},
-                       Agg:     &model.MeasureAgg{FieldName: "value", Func: 
c.fn},
+                       Agg:     &model.MeasureAgg{FieldName: "value", Func: 
fn},
                }
                ops, err := BuildOperators(opts, planSchema(), 
vectorized.NewMemoryTracker(1<<20), 1024)
                if err != nil {
-                       t.Fatalf("%v: BuildOperators error: %v", c.fn, err)
+                       t.Fatalf("%v: BuildOperators error: %v", fn, err)
                }
                agg := ops[0].(*BatchAggregation)
-               // Output schema's last column is the agg result; its Name is 
the auto-derived output name.
+               // Output schema layout: key columns then the agg result column.
                got := 
agg.OutputSchema().Columns[len(agg.OutputSchema().Columns)-1].Name
-               if got != c.want {
-                       t.Fatalf("%v: want output column name %q, got %q", 
c.fn, c.want, got)
+               if got != wantName {
+                       t.Fatalf("%v: want output column name %q (row-path 
parity), got %q", fn, wantName, got)
                }
        }
 }
diff --git a/test/integration/standalone/query/vectorized_test.go 
b/test/integration/standalone/query/vectorized_test.go
index abd037352..f577b7fac 100644
--- a/test/integration/standalone/query/vectorized_test.go
+++ b/test/integration/standalone/query/vectorized_test.go
@@ -89,6 +89,11 @@ var _ = ginkgo.Describe("vectorized parity", ginkgo.Ordered, 
func() {
                config := setup.PropertyClusterConfig(dfWriter)
                addr, _, closeFn := setup.ClosableStandalone(config, path, 
ports,
                        "--measure-vectorized-enabled=true",
+                       // G8d.3 will add 
--measure-vectorized-aggregation-enabled=true
+                       // here once the MAX-specific egress parity gap is 
closed.
+                       // 8 of 9 GroupBy+Agg cases already pass with the gate 
on; the
+                       // remaining failure (group_max timeout) is tracked 
separately
+                       // before this flag flips in the parity gate too.
                )
                stopFn = func() {
                        closeFn()

Reply via email to