This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit cb61bca97cd85ee3ffe76fbb9e9543b389351304
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 13 09:19:26 2026 +0000

    feat(query/measure): MeasureGroupBy/Agg options and BuildOperators planner 
(G7d)
    
    `MeasureQueryOptions` gains `GroupBy *MeasureGroupBy` and `Agg *MeasureAgg`
    fields. v1 matches the existing singular `QueryRequest.agg` proto field;
    multi-agg per query is not supported. No proto change.
    
    `BuildOperators` translates `(opts, schema, tracker, batchSize)` into a
    list of `BreakerOperator` to chain after the scan source:
    
    - GroupBy + Agg → one BatchAggregation (its own keyIndex map handles
      grouping and folding; a separate BatchGroupBy would double the row
      materialization).
    - GroupBy without Agg → error (semantic ambiguous in v1).
    - Agg without GroupBy → error (scalar reduce deferred).
    - Neither → empty slice (caller emits raw rows).
    
    Output column name is auto-derived as `<field>_<func>` (e.g. `value_sum`,
    `latency_count`). `protoAggFuncToInternal` maps the proto
    `AggregationFunction` enum to the internal `AggFunc` constant; UNSPECIFIED
    is rejected.
    
    Unit tests cover: empty options, GroupBy+Agg, every supported function,
    both validation errors, unknown groupby tag, unknown agg field, nil
    tracker, and multi-key GroupBy preserving TagNames order.
---
 pkg/query/model/model.go                  |  18 +++
 pkg/query/vectorized/measure/plan.go      | 167 +++++++++++++++++++++++++++
 pkg/query/vectorized/measure/plan_test.go | 185 ++++++++++++++++++++++++++++++
 3 files changed, 370 insertions(+)

diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go
index 246d8c297..bb7e8efae 100644
--- a/pkg/query/model/model.go
+++ b/pkg/query/model/model.go
@@ -55,11 +55,29 @@ type TagProjection struct {
        Names  []string
 }
 
+// MeasureGroupBy describes a GroupBy clause for a measure query. v1 supports
+// a single tag family; each entry in TagNames is a key column. An empty
+// TagNames slice means the query carries no GroupBy clause.
+type MeasureGroupBy struct {
+       TagFamily string
+       TagNames  []string
+}
+
+// MeasureAgg describes a single aggregation for a measure query. v1 supports
+// one aggregation per query — matches the singular QueryRequest.agg proto
+// field. FieldName must reference a field in 
MeasureQueryOptions.FieldProjection.
+type MeasureAgg struct {
+       FieldName string
+       Func      modelv1.AggregationFunction
+}
+
 // MeasureQueryOptions is the options of a measure query.
 type MeasureQueryOptions struct {
        Query           index.Query
        TimeRange       *timestamp.TimeRange
        Order           *index.OrderBy
+       GroupBy         *MeasureGroupBy
+       Agg             *MeasureAgg
        Name            string
        Entities        [][]*modelv1.TagValue
        TagProjection   []TagProjection
diff --git a/pkg/query/vectorized/measure/plan.go 
b/pkg/query/vectorized/measure/plan.go
new file mode 100644
index 000000000..1cd0098ad
--- /dev/null
+++ b/pkg/query/vectorized/measure/plan.go
@@ -0,0 +1,167 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "fmt"
+       "strings"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// Conservative per-new-group memory estimate. Covers map entry + bucket
+// struct + key/keyCol allocations; calibrated against G7g bench (TBD).
+const aggEntrySize int64 = 512
+
+// BuildOperators translates the GroupBy + Agg slice of MeasureQueryOptions
+// into a list of BreakerOperators that, chained after the scan source, form
+// the vectorized aggregation pipeline.
+//
+// Routing rules (v1):
+//   - GroupBy + Agg both set → emit a single BatchAggregation. The operator's
+//     own keyIndex map produces per-group buckets and folds the agg slot;
+//     a separate BatchGroupBy would double the row materialization.
+//   - GroupBy set without Agg → unsupported. The proto field is paired with
+//     `agg` for the existing TopN path; without Agg the result semantic is
+//     ambiguous (selection? deduplication?). Return an error.
+//   - Agg set without GroupBy → unsupported. Scalar reduce → single output
+//     row is deferred to a future increment.
+//   - Neither set → empty operator list; caller emits raw rows.
+//
+// tracker is the per-pipeline MemoryTracker (G7a); it must be non-nil when
+// any operator is emitted so per-group reservations route to the shared
+// budget. batchSize controls the operator's output pagination.
+func BuildOperators(
+       opts model.MeasureQueryOptions, schema *vectorized.BatchSchema,
+       tracker *vectorized.MemoryTracker, batchSize int,
+) ([]vectorized.BreakerOperator, error) {
+       hasGroupBy := opts.GroupBy != nil && opts.GroupBy.TagFamily != "" && 
len(opts.GroupBy.TagNames) > 0
+       hasAgg := opts.Agg != nil
+
+       if !hasGroupBy && !hasAgg {
+               return nil, nil
+       }
+       if hasGroupBy && !hasAgg {
+               return nil, fmt.Errorf("vectorized.measure: GroupBy without Agg 
is not supported in v1")
+       }
+       if hasAgg && !hasGroupBy {
+               return nil, fmt.Errorf("vectorized.measure: Agg without GroupBy 
(scalar reduce) is not supported in v1")
+       }
+       if tracker == nil {
+               return nil, fmt.Errorf("vectorized.measure: BuildOperators 
requires a non-nil shared MemoryTracker")
+       }
+       if batchSize <= 0 {
+               return nil, fmt.Errorf("vectorized.measure: batchSize must be > 
0, got %d", batchSize)
+       }
+
+       keyIndices, keyErr := lookupGroupByKeyIndices(schema, opts.GroupBy)
+       if keyErr != nil {
+               return nil, keyErr
+       }
+
+       fieldIdx, fieldErr := lookupFieldColumnIndex(schema, opts.Agg.FieldName)
+       if fieldErr != nil {
+               return nil, fieldErr
+       }
+
+       aggFn, fnErr := protoAggFuncToInternal(opts.Agg.Func)
+       if fnErr != nil {
+               return nil, fnErr
+       }
+
+       spec := AggSpec{
+               Func:     aggFn,
+               InputCol: fieldIdx,
+               Output:   aggOutputName(opts.Agg.FieldName, aggFn),
+       }
+       agg := NewBatchAggregation(schema, keyIndices, []AggSpec{spec},
+               AggModeAll, batchSize, tracker, aggEntrySize)
+       return []vectorized.BreakerOperator{agg}, nil
+}
+
+// lookupGroupByKeyIndices resolves each GroupBy tag name to its column index
+// in schema. All names must exist within the configured tag family.
+func lookupGroupByKeyIndices(schema *vectorized.BatchSchema, gb 
*model.MeasureGroupBy) ([]int, error) {
+       indices := make([]int, 0, len(gb.TagNames))
+       for _, name := range gb.TagNames {
+               idx := -1
+               for i, def := range schema.Columns {
+                       if def.Role == vectorized.RoleTag && def.TagFamily == 
gb.TagFamily && def.Name == name {
+                               idx = i
+                               break
+                       }
+               }
+               if idx < 0 {
+                       return nil, fmt.Errorf("vectorized.measure: GroupBy tag 
%s.%s not present in schema", gb.TagFamily, name)
+               }
+               indices = append(indices, idx)
+       }
+       return indices, nil
+}
+
+// lookupFieldColumnIndex resolves the agg input field name to its column
+// index. Only field columns (Role == RoleField) are eligible.
+func lookupFieldColumnIndex(schema *vectorized.BatchSchema, name string) (int, 
error) {
+       for i, def := range schema.Columns {
+               if def.Role == vectorized.RoleField && def.Name == name {
+                       return i, nil
+               }
+       }
+       return -1, fmt.Errorf("vectorized.measure: Agg field %q not present in 
schema", name)
+}
+
+// protoAggFuncToInternal maps the proto AggregationFunction enum to the
+// internal AggFunc constant. UNSPECIFIED is rejected — Aggregation must
+// name a concrete function.
+func protoAggFuncToInternal(f modelv1.AggregationFunction) (AggFunc, error) {
+       switch f {
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM:
+               return AggSum, nil
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT:
+               return AggCount, nil
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN:
+               return AggMin, nil
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX:
+               return AggMax, nil
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN:
+               return AggMean, nil
+       case modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED:
+               return 0, fmt.Errorf("vectorized.measure: Agg.Function is 
UNSPECIFIED")
+       }
+       return 0, fmt.Errorf("vectorized.measure: unknown AggregationFunction 
%v", f)
+}
+
+// aggOutputName derives the agg result column name: <field>_<func> 
(lowercase).
+func aggOutputName(fieldName string, fn AggFunc) string {
+       suffix := ""
+       switch fn {
+       case AggSum:
+               suffix = "sum"
+       case AggCount:
+               suffix = "count"
+       case AggMin:
+               suffix = "min"
+       case AggMax:
+               suffix = "max"
+       case AggMean:
+               suffix = "mean"
+       }
+       return strings.Join([]string{fieldName, suffix}, "_")
+}
diff --git a/pkg/query/vectorized/measure/plan_test.go 
b/pkg/query/vectorized/measure/plan_test.go
new file mode 100644
index 000000000..0e9ff8370
--- /dev/null
+++ b/pkg/query/vectorized/measure/plan_test.go
@@ -0,0 +1,185 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "strings"
+       "testing"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// planSchema is the fixture schema for plan_test: one groupby-eligible tag,
+// one agg-eligible field.
+func planSchema() *vectorized.BatchSchema {
+       return vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", 
Type: vectorized.ColumnTypeString},
+               {Role: vectorized.RoleField, Name: "value", Type: 
vectorized.ColumnTypeInt64},
+       })
+}
+
+func TestBuildOperators_NoGroupByNoAgg_ReturnsEmpty(t *testing.T) {
+       ops, err := BuildOperators(model.MeasureQueryOptions{}, planSchema(),
+               vectorized.NewMemoryTracker(1<<20), 1024)
+       if err != nil {
+               t.Fatalf("empty opts should not error: %v", err)
+       }
+       if len(ops) != 0 {
+               t.Fatalf("empty opts should produce no operators, got %d", 
len(ops))
+       }
+}
+
+func TestBuildOperators_GroupByPlusAgg_EmitsBatchAggregation(t *testing.T) {
+       opts := model.MeasureQueryOptions{
+               GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: 
[]string{"svc"}},
+               Agg:     &model.MeasureAgg{FieldName: "value", Func: 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM},
+       }
+       ops, err := BuildOperators(opts, planSchema(), 
vectorized.NewMemoryTracker(1<<20), 1024)
+       if err != nil {
+               t.Fatalf("BuildOperators error: %v", err)
+       }
+       if len(ops) != 1 {
+               t.Fatalf("GroupBy+Agg should emit 1 operator 
(BatchAggregation), got %d", len(ops))
+       }
+       if _, ok := ops[0].(*BatchAggregation); !ok {
+               t.Fatalf("operator must be *BatchAggregation, got %T", ops[0])
+       }
+}
+
+func TestBuildOperators_AggOutputName_IsFieldUnderscoreFunc(t *testing.T) {
+       cases := []struct {
+               want string
+               fn   modelv1.AggregationFunction
+       }{
+               {"value_sum", 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM},
+               {"value_count", 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT},
+               {"value_min", 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN},
+               {"value_max", 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX},
+               {"value_mean", 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN},
+       }
+       for _, c := range cases {
+               opts := model.MeasureQueryOptions{
+                       GroupBy: &model.MeasureGroupBy{TagFamily: "default", 
TagNames: []string{"svc"}},
+                       Agg:     &model.MeasureAgg{FieldName: "value", Func: 
c.fn},
+               }
+               ops, err := BuildOperators(opts, planSchema(), 
vectorized.NewMemoryTracker(1<<20), 1024)
+               if err != nil {
+                       t.Fatalf("%v: BuildOperators error: %v", c.fn, err)
+               }
+               agg := ops[0].(*BatchAggregation)
+               // Output schema's last column is the agg result; its Name is 
the auto-derived output name.
+               got := 
agg.OutputSchema().Columns[len(agg.OutputSchema().Columns)-1].Name
+               if got != c.want {
+                       t.Fatalf("%v: want output column name %q, got %q", 
c.fn, c.want, got)
+               }
+       }
+}
+
+func TestBuildOperators_GroupByWithoutAgg_Errors(t *testing.T) {
+       opts := model.MeasureQueryOptions{
+               GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: 
[]string{"svc"}},
+       }
+       _, err := BuildOperators(opts, planSchema(), 
vectorized.NewMemoryTracker(1<<20), 1024)
+       if err == nil {
+               t.Fatal("GroupBy without Agg must error in v1")
+       }
+}
+
+func TestBuildOperators_AggWithoutGroupBy_Errors(t *testing.T) {
+       opts := model.MeasureQueryOptions{
+               Agg: &model.MeasureAgg{FieldName: "value", Func: 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM},
+       }
+       _, err := BuildOperators(opts, planSchema(), 
vectorized.NewMemoryTracker(1<<20), 1024)
+       if err == nil {
+               t.Fatal("Agg without GroupBy (scalar reduce) must error in v1")
+       }
+}
+
+func TestBuildOperators_UnknownGroupByTag_Errors(t *testing.T) {
+       opts := model.MeasureQueryOptions{
+               GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: 
[]string{"missing"}},
+               Agg:     &model.MeasureAgg{FieldName: "value", Func: 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM},
+       }
+       _, err := BuildOperators(opts, planSchema(), 
vectorized.NewMemoryTracker(1<<20), 1024)
+       if err == nil {
+               t.Fatal("unknown groupby tag must error")
+       }
+       if !strings.Contains(err.Error(), "missing") {
+               t.Fatalf("error should name the missing tag, got %v", err)
+       }
+}
+
+func TestBuildOperators_UnknownAggField_Errors(t *testing.T) {
+       opts := model.MeasureQueryOptions{
+               GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: 
[]string{"svc"}},
+               Agg:     &model.MeasureAgg{FieldName: "ghost", Func: 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM},
+       }
+       _, err := BuildOperators(opts, planSchema(), 
vectorized.NewMemoryTracker(1<<20), 1024)
+       if err == nil {
+               t.Fatal("unknown agg field must error")
+       }
+       if !strings.Contains(err.Error(), "ghost") {
+               t.Fatalf("error should name the missing field, got %v", err)
+       }
+}
+
+func TestBuildOperators_AggUnspecified_Errors(t *testing.T) {
+       opts := model.MeasureQueryOptions{
+               GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: 
[]string{"svc"}},
+               Agg:     &model.MeasureAgg{FieldName: "value", Func: 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED},
+       }
+       _, err := BuildOperators(opts, planSchema(), 
vectorized.NewMemoryTracker(1<<20), 1024)
+       if err == nil {
+               t.Fatal("UNSPECIFIED Agg.Func must error")
+       }
+}
+
+func TestBuildOperators_NilTracker_Errors(t *testing.T) {
+       opts := model.MeasureQueryOptions{
+               GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: 
[]string{"svc"}},
+               Agg:     &model.MeasureAgg{FieldName: "value", Func: 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM},
+       }
+       _, err := BuildOperators(opts, planSchema(), nil, 1024)
+       if err == nil {
+               t.Fatal("nil tracker must error when operators are emitted")
+       }
+}
+
+func TestBuildOperators_MultiKeyGroupBy_PreservesKeyOrder(t *testing.T) {
+       schema := vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleTag, TagFamily: "default", Name: 
"region", Type: vectorized.ColumnTypeString},
+               {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", 
Type: vectorized.ColumnTypeString},
+               {Role: vectorized.RoleField, Name: "value", Type: 
vectorized.ColumnTypeInt64},
+       })
+       opts := model.MeasureQueryOptions{
+               GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: 
[]string{"region", "svc"}},
+               Agg:     &model.MeasureAgg{FieldName: "value", Func: 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM},
+       }
+       ops, err := BuildOperators(opts, schema, 
vectorized.NewMemoryTracker(1<<20), 1024)
+       if err != nil {
+               t.Fatalf("BuildOperators error: %v", err)
+       }
+       agg := ops[0].(*BatchAggregation)
+       // First two output columns are the keys, in TagNames order.
+       out := agg.OutputSchema().Columns
+       if out[0].Name != "region" || out[1].Name != "svc" {
+               t.Fatalf("output columns 0/1 should be region/svc (TagNames 
order), got %s/%s", out[0].Name, out[1].Name)
+       }
+}

Reply via email to