This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit cb61bca97cd85ee3ffe76fbb9e9543b389351304 Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 13 09:19:26 2026 +0000 feat(query/measure): MeasureGroupBy/Agg options and BuildOperators planner (G7d) `MeasureQueryOptions` gains `GroupBy *MeasureGroupBy` and `Agg *MeasureAgg` fields. v1 matches the existing singular `QueryRequest.agg` proto field; multi-agg per query is not supported. No proto change. `BuildOperators` translates `(opts, schema, tracker, batchSize)` into a list of `BreakerOperator` to chain after the scan source: - GroupBy + Agg → one BatchAggregation (its own keyIndex map handles grouping and folding; a separate BatchGroupBy would double the row materialization). - GroupBy without Agg → error (semantic ambiguous in v1). - Agg without GroupBy → error (scalar reduce deferred). - Neither → empty slice (caller emits raw rows). Output column name is auto-derived as `<field>_<func>` (e.g. `value_sum`, `latency_count`). `protoAggFuncToInternal` maps the proto `AggregationFunction` enum to the internal `AggFunc` constant; UNSPECIFIED is rejected. Unit tests cover: empty options, GroupBy+Agg, every supported function, both validation errors, unknown groupby tag, unknown agg field, nil tracker, and multi-key GroupBy preserving TagNames order. --- pkg/query/model/model.go | 18 +++ pkg/query/vectorized/measure/plan.go | 167 +++++++++++++++++++++++++++ pkg/query/vectorized/measure/plan_test.go | 185 ++++++++++++++++++++++++++++++ 3 files changed, 370 insertions(+) diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go index 246d8c297..bb7e8efae 100644 --- a/pkg/query/model/model.go +++ b/pkg/query/model/model.go @@ -55,11 +55,29 @@ type TagProjection struct { Names []string } +// MeasureGroupBy describes a GroupBy clause for a measure query. v1 supports +// a single tag family; each entry in TagNames is a key column. An empty +// TagNames slice means the query carries no GroupBy clause. +type MeasureGroupBy struct { + TagFamily string + TagNames []string +} + +// MeasureAgg describes a single aggregation for a measure query. v1 supports +// one aggregation per query — matches the singular QueryRequest.agg proto +// field. FieldName must reference a field in MeasureQueryOptions.FieldProjection. +type MeasureAgg struct { + FieldName string + Func modelv1.AggregationFunction +} + // MeasureQueryOptions is the options of a measure query. type MeasureQueryOptions struct { Query index.Query TimeRange *timestamp.TimeRange Order *index.OrderBy + GroupBy *MeasureGroupBy + Agg *MeasureAgg Name string Entities [][]*modelv1.TagValue TagProjection []TagProjection diff --git a/pkg/query/vectorized/measure/plan.go b/pkg/query/vectorized/measure/plan.go new file mode 100644 index 000000000..1cd0098ad --- /dev/null +++ b/pkg/query/vectorized/measure/plan.go @@ -0,0 +1,167 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "fmt" + "strings" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// Conservative per-new-group memory estimate. Covers map entry + bucket +// struct + key/keyCol allocations; calibrated against G7g bench (TBD). +const aggEntrySize int64 = 512 + +// BuildOperators translates the GroupBy + Agg slice of MeasureQueryOptions +// into a list of BreakerOperators that, chained after the scan source, form +// the vectorized aggregation pipeline. +// +// Routing rules (v1): +// - GroupBy + Agg both set → emit a single BatchAggregation. The operator's +// own keyIndex map produces per-group buckets and folds the agg slot; +// a separate BatchGroupBy would double the row materialization. +// - GroupBy set without Agg → unsupported. The proto field is paired with +// `agg` for the existing TopN path; without Agg the result semantic is +// ambiguous (selection? deduplication?). Return an error. +// - Agg set without GroupBy → unsupported. Scalar reduce → single output +// row is deferred to a future increment. +// - Neither set → empty operator list; caller emits raw rows. +// +// tracker is the per-pipeline MemoryTracker (G7a); it must be non-nil when +// any operator is emitted so per-group reservations route to the shared +// budget. batchSize controls the operator's output pagination. +func BuildOperators( + opts model.MeasureQueryOptions, schema *vectorized.BatchSchema, + tracker *vectorized.MemoryTracker, batchSize int, +) ([]vectorized.BreakerOperator, error) { + hasGroupBy := opts.GroupBy != nil && opts.GroupBy.TagFamily != "" && len(opts.GroupBy.TagNames) > 0 + hasAgg := opts.Agg != nil + + if !hasGroupBy && !hasAgg { + return nil, nil + } + if hasGroupBy && !hasAgg { + return nil, fmt.Errorf("vectorized.measure: GroupBy without Agg is not supported in v1") + } + if hasAgg && !hasGroupBy { + return nil, fmt.Errorf("vectorized.measure: Agg without GroupBy (scalar reduce) is not supported in v1") + } + if tracker == nil { + return nil, fmt.Errorf("vectorized.measure: BuildOperators requires a non-nil shared MemoryTracker") + } + if batchSize <= 0 { + return nil, fmt.Errorf("vectorized.measure: batchSize must be > 0, got %d", batchSize) + } + + keyIndices, keyErr := lookupGroupByKeyIndices(schema, opts.GroupBy) + if keyErr != nil { + return nil, keyErr + } + + fieldIdx, fieldErr := lookupFieldColumnIndex(schema, opts.Agg.FieldName) + if fieldErr != nil { + return nil, fieldErr + } + + aggFn, fnErr := protoAggFuncToInternal(opts.Agg.Func) + if fnErr != nil { + return nil, fnErr + } + + spec := AggSpec{ + Func: aggFn, + InputCol: fieldIdx, + Output: aggOutputName(opts.Agg.FieldName, aggFn), + } + agg := NewBatchAggregation(schema, keyIndices, []AggSpec{spec}, + AggModeAll, batchSize, tracker, aggEntrySize) + return []vectorized.BreakerOperator{agg}, nil +} + +// lookupGroupByKeyIndices resolves each GroupBy tag name to its column index +// in schema. All names must exist within the configured tag family. +func lookupGroupByKeyIndices(schema *vectorized.BatchSchema, gb *model.MeasureGroupBy) ([]int, error) { + indices := make([]int, 0, len(gb.TagNames)) + for _, name := range gb.TagNames { + idx := -1 + for i, def := range schema.Columns { + if def.Role == vectorized.RoleTag && def.TagFamily == gb.TagFamily && def.Name == name { + idx = i + break + } + } + if idx < 0 { + return nil, fmt.Errorf("vectorized.measure: GroupBy tag %s.%s not present in schema", gb.TagFamily, name) + } + indices = append(indices, idx) + } + return indices, nil +} + +// lookupFieldColumnIndex resolves the agg input field name to its column +// index. Only field columns (Role == RoleField) are eligible. +func lookupFieldColumnIndex(schema *vectorized.BatchSchema, name string) (int, error) { + for i, def := range schema.Columns { + if def.Role == vectorized.RoleField && def.Name == name { + return i, nil + } + } + return -1, fmt.Errorf("vectorized.measure: Agg field %q not present in schema", name) +} + +// protoAggFuncToInternal maps the proto AggregationFunction enum to the +// internal AggFunc constant. UNSPECIFIED is rejected — Aggregation must +// name a concrete function. +func protoAggFuncToInternal(f modelv1.AggregationFunction) (AggFunc, error) { + switch f { + case modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM: + return AggSum, nil + case modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT: + return AggCount, nil + case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN: + return AggMin, nil + case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX: + return AggMax, nil + case modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN: + return AggMean, nil + case modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED: + return 0, fmt.Errorf("vectorized.measure: Agg.Function is UNSPECIFIED") + } + return 0, fmt.Errorf("vectorized.measure: unknown AggregationFunction %v", f) +} + +// aggOutputName derives the agg result column name: <field>_<func> (lowercase). +func aggOutputName(fieldName string, fn AggFunc) string { + suffix := "" + switch fn { + case AggSum: + suffix = "sum" + case AggCount: + suffix = "count" + case AggMin: + suffix = "min" + case AggMax: + suffix = "max" + case AggMean: + suffix = "mean" + } + return strings.Join([]string{fieldName, suffix}, "_") +} diff --git a/pkg/query/vectorized/measure/plan_test.go b/pkg/query/vectorized/measure/plan_test.go new file mode 100644 index 000000000..0e9ff8370 --- /dev/null +++ b/pkg/query/vectorized/measure/plan_test.go @@ -0,0 +1,185 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "strings" + "testing" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// planSchema is the fixture schema for plan_test: one groupby-eligible tag, +// one agg-eligible field. +func planSchema() *vectorized.BatchSchema { + return vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "value", Type: vectorized.ColumnTypeInt64}, + }) +} + +func TestBuildOperators_NoGroupByNoAgg_ReturnsEmpty(t *testing.T) { + ops, err := BuildOperators(model.MeasureQueryOptions{}, planSchema(), + vectorized.NewMemoryTracker(1<<20), 1024) + if err != nil { + t.Fatalf("empty opts should not error: %v", err) + } + if len(ops) != 0 { + t.Fatalf("empty opts should produce no operators, got %d", len(ops)) + } +} + +func TestBuildOperators_GroupByPlusAgg_EmitsBatchAggregation(t *testing.T) { + opts := model.MeasureQueryOptions{ + GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: []string{"svc"}}, + Agg: &model.MeasureAgg{FieldName: "value", Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, + } + ops, err := BuildOperators(opts, planSchema(), vectorized.NewMemoryTracker(1<<20), 1024) + if err != nil { + t.Fatalf("BuildOperators error: %v", err) + } + if len(ops) != 1 { + t.Fatalf("GroupBy+Agg should emit 1 operator (BatchAggregation), got %d", len(ops)) + } + if _, ok := ops[0].(*BatchAggregation); !ok { + t.Fatalf("operator must be *BatchAggregation, got %T", ops[0]) + } +} + +func TestBuildOperators_AggOutputName_IsFieldUnderscoreFunc(t *testing.T) { + cases := []struct { + want string + fn modelv1.AggregationFunction + }{ + {"value_sum", modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, + {"value_count", modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT}, + {"value_min", modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN}, + {"value_max", modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX}, + {"value_mean", modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN}, + } + for _, c := range cases { + opts := model.MeasureQueryOptions{ + GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: []string{"svc"}}, + Agg: &model.MeasureAgg{FieldName: "value", Func: c.fn}, + } + ops, err := BuildOperators(opts, planSchema(), vectorized.NewMemoryTracker(1<<20), 1024) + if err != nil { + t.Fatalf("%v: BuildOperators error: %v", c.fn, err) + } + agg := ops[0].(*BatchAggregation) + // Output schema's last column is the agg result; its Name is the auto-derived output name. + got := agg.OutputSchema().Columns[len(agg.OutputSchema().Columns)-1].Name + if got != c.want { + t.Fatalf("%v: want output column name %q, got %q", c.fn, c.want, got) + } + } +} + +func TestBuildOperators_GroupByWithoutAgg_Errors(t *testing.T) { + opts := model.MeasureQueryOptions{ + GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: []string{"svc"}}, + } + _, err := BuildOperators(opts, planSchema(), vectorized.NewMemoryTracker(1<<20), 1024) + if err == nil { + t.Fatal("GroupBy without Agg must error in v1") + } +} + +func TestBuildOperators_AggWithoutGroupBy_Errors(t *testing.T) { + opts := model.MeasureQueryOptions{ + Agg: &model.MeasureAgg{FieldName: "value", Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, + } + _, err := BuildOperators(opts, planSchema(), vectorized.NewMemoryTracker(1<<20), 1024) + if err == nil { + t.Fatal("Agg without GroupBy (scalar reduce) must error in v1") + } +} + +func TestBuildOperators_UnknownGroupByTag_Errors(t *testing.T) { + opts := model.MeasureQueryOptions{ + GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: []string{"missing"}}, + Agg: &model.MeasureAgg{FieldName: "value", Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, + } + _, err := BuildOperators(opts, planSchema(), vectorized.NewMemoryTracker(1<<20), 1024) + if err == nil { + t.Fatal("unknown groupby tag must error") + } + if !strings.Contains(err.Error(), "missing") { + t.Fatalf("error should name the missing tag, got %v", err) + } +} + +func TestBuildOperators_UnknownAggField_Errors(t *testing.T) { + opts := model.MeasureQueryOptions{ + GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: []string{"svc"}}, + Agg: &model.MeasureAgg{FieldName: "ghost", Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, + } + _, err := BuildOperators(opts, planSchema(), vectorized.NewMemoryTracker(1<<20), 1024) + if err == nil { + t.Fatal("unknown agg field must error") + } + if !strings.Contains(err.Error(), "ghost") { + t.Fatalf("error should name the missing field, got %v", err) + } +} + +func TestBuildOperators_AggUnspecified_Errors(t *testing.T) { + opts := model.MeasureQueryOptions{ + GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: []string{"svc"}}, + Agg: &model.MeasureAgg{FieldName: "value", Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED}, + } + _, err := BuildOperators(opts, planSchema(), vectorized.NewMemoryTracker(1<<20), 1024) + if err == nil { + t.Fatal("UNSPECIFIED Agg.Func must error") + } +} + +func TestBuildOperators_NilTracker_Errors(t *testing.T) { + opts := model.MeasureQueryOptions{ + GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: []string{"svc"}}, + Agg: &model.MeasureAgg{FieldName: "value", Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, + } + _, err := BuildOperators(opts, planSchema(), nil, 1024) + if err == nil { + t.Fatal("nil tracker must error when operators are emitted") + } +} + +func TestBuildOperators_MultiKeyGroupBy_PreservesKeyOrder(t *testing.T) { + schema := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTag, TagFamily: "default", Name: "region", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "value", Type: vectorized.ColumnTypeInt64}, + }) + opts := model.MeasureQueryOptions{ + GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: []string{"region", "svc"}}, + Agg: &model.MeasureAgg{FieldName: "value", Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, + } + ops, err := BuildOperators(opts, schema, vectorized.NewMemoryTracker(1<<20), 1024) + if err != nil { + t.Fatalf("BuildOperators error: %v", err) + } + agg := ops[0].(*BatchAggregation) + // First two output columns are the keys, in TagNames order. + out := agg.OutputSchema().Columns + if out[0].Name != "region" || out[1].Name != "svc" { + t.Fatalf("output columns 0/1 should be region/svc (TagNames order), got %s/%s", out[0].Name, out[1].Name) + } +}
