This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 89900e66112a31ed6a682356b416b9b07e8e9888 Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 13 09:57:00 2026 +0000 feat(query/vectorized/measure/plan): vec analyzer (G8b) Analyze(req *measurev1.QueryRequest, schema *databasev1.Measure) (VecPlan, error) translates a measure query into the vec plan tree introduced in G8a. This is the vec counterpart of the deprecated pkg/query/logical/measure.Analyze — it produces vec plan nodes directly, with no logical→physical resolution step and no leaf substitution into a row plan. Plan shape: Limit └─ (Top?) └─ (GroupByAgg if group_by + agg both set) └─ Scan Coverage: - Static query parameters (Measure schema, time range, tag projection, field projection) populate Scan.Params at analyze time. - Runtime parameters that depend on the executor's MeasureExecutionContext (the resolved index.Query and the entity table) are left blank; the executor (G8c) fills them in immediately before invoking Build. - GroupBy + Agg are coalesced into a single GroupByAgg node — they pair to one BatchAggregation operator per G7d's planner. - Validation matches the G7d contract: GroupBy and Agg must come together; tag and field references must exist in the Measure schema; GroupBy tag_projection v1 supports a single tag family. - Top sits between GroupByAgg and Limit so the heap operates on aggregated rows when GroupBy+Agg is present. Tests cover every plan shape (bare Scan/Limit, GroupByAgg, Top, all three together), every validation error (GroupBy without Agg, Agg without GroupBy, unknown groupby tag, unknown agg field, nil request, nil schema), the default-limit fallback when QueryRequest.Limit is zero, and PrintTree rendering for debugging. --- pkg/query/vectorized/measure/plan/analyzer.go | 209 +++++++++++++++ pkg/query/vectorized/measure/plan/analyzer_test.go | 291 +++++++++++++++++++++ 2 files changed, 500 insertions(+) diff --git a/pkg/query/vectorized/measure/plan/analyzer.go b/pkg/query/vectorized/measure/plan/analyzer.go new file mode 100644 index 000000000..69632f5ea --- /dev/null +++ b/pkg/query/vectorized/measure/plan/analyzer.go @@ -0,0 +1,209 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package plan + +import ( + "fmt" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + "github.com/apache/skywalking-banyandb/pkg/query/model" + measure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +// defaultLimit mirrors the row-path measure_analyzer.go default. Matched so +// vec and row produce identical paging for requests that don't set Limit. +const defaultLimit uint32 = 100 + +// Analyze translates a measurev1.QueryRequest + its Measure schema into a +// VecPlan tree. It is the vec counterpart of pkg/query/logical/measure +// (deprecated) but produces vec plan nodes — there is no leaf +// substitution into a row plan. +// +// The returned tree is structural: it carries the static (proto-derived) +// query parameters in `Scan.Params` and the BatchSchema for downstream +// nodes to consult. Runtime fields that depend on the executor's +// MeasureExecutionContext — the resolved `index.Query` and the entity +// table — are NOT populated here; the executor (G8c) fills them in +// before invoking Build. +// +// Errors are returned for: +// - nil schema +// - tag/field projection naming columns not in the schema +// - GroupBy referencing a tag absent from the schema +// - Agg referencing a field absent from the schema +// - GroupBy set without Agg (raw groupby not supported in v1) +// - Agg set without GroupBy (scalar reduce not supported in v1) +func Analyze(req *measurev1.QueryRequest, measureSchema *databasev1.Measure) (VecPlan, error) { + if req == nil { + return nil, fmt.Errorf("plan.Analyze: nil request") + } + if measureSchema == nil { + return nil, fmt.Errorf("plan.Analyze: nil Measure schema") + } + + tagProjection := buildTagProjection(req) + fieldProjection := req.GetFieldProjection().GetNames() + opts := model.MeasureQueryOptions{ + TagProjection: tagProjection, + FieldProjection: fieldProjection, + } + batchSchema, schemaErr := measure.BuildBatchSchema(measureSchema, opts) + if schemaErr != nil { + return nil, fmt.Errorf("plan.Analyze: %w", schemaErr) + } + + var tr *timestamp.TimeRange + if t := req.GetTimeRange(); t != nil { + r := timestamp.NewInclusiveTimeRange(t.GetBegin().AsTime(), t.GetEnd().AsTime()) + tr = &r + } + + var plan VecPlan = NewScan(batchSchema, ScanParams{ + Measure: measureSchema, + TimeRange: tr, + TagProjection: tagProjection, + FieldProjection: fieldProjection, + }) + + // GroupBy + Agg coalesced into a single GroupByAgg node. Validation + // matches the G7d planner's contract. + hasGroupBy := req.GetGroupBy() != nil + hasAgg := req.GetAgg() != nil + switch { + case hasGroupBy && hasAgg: + gb, aggSpec, validateErr := translateGroupByAgg(req, measureSchema) + if validateErr != nil { + return nil, validateErr + } + gba, gbaErr := NewGroupByAgg(plan, gb, aggSpec) + if gbaErr != nil { + return nil, gbaErr + } + plan = gba + case hasGroupBy: + return nil, fmt.Errorf("plan.Analyze: GroupBy without Agg is not supported in v1") + case hasAgg: + return nil, fmt.Errorf("plan.Analyze: Agg without GroupBy (scalar reduce) is not supported in v1") + } + + if t := req.GetTop(); t != nil { + asc := t.GetFieldValueSort() == 1 // SORT_ASC == 1 in modelv1.Sort + plan = NewTop(plan, t.GetFieldName(), int(t.GetNumber()), asc) + } + + limitN := req.GetLimit() + if limitN == 0 { + limitN = defaultLimit + } + plan = NewLimit(plan, req.GetOffset(), limitN) + + return plan, nil +} + +// buildTagProjection converts the proto tag projection into the model-level +// slice the BatchSchema builder consumes. +func buildTagProjection(req *measurev1.QueryRequest) []model.TagProjection { + tp := req.GetTagProjection() + if tp == nil { + return nil + } + families := tp.GetTagFamilies() + out := make([]model.TagProjection, 0, len(families)) + for _, tf := range families { + out = append(out, model.TagProjection{ + Family: tf.GetName(), + Names: append([]string(nil), tf.GetTags()...), + }) + } + return out +} + +// translateGroupByAgg builds the model GroupBy/Agg structs from the proto, +// validating that: +// - the GroupBy tag_projection names exactly one family with non-empty tags +// (v1 single-family limitation) +// - the named GroupBy tags exist in the Measure schema +// - the Agg field exists in the Measure schema +func translateGroupByAgg(req *measurev1.QueryRequest, measureSchema *databasev1.Measure) ( + *model.MeasureGroupBy, *model.MeasureAgg, error, +) { + gbProto := req.GetGroupBy() + families := gbProto.GetTagProjection().GetTagFamilies() + if len(families) == 0 { + return nil, nil, fmt.Errorf("plan.Analyze: GroupBy.tag_projection must list at least one tag family") + } + if len(families) > 1 { + return nil, nil, fmt.Errorf("plan.Analyze: GroupBy.tag_projection v1 supports a single tag family, got %d", len(families)) + } + family := families[0] + if len(family.GetTags()) == 0 { + return nil, nil, fmt.Errorf("plan.Analyze: GroupBy.tag_projection family %q has no tags", family.GetName()) + } + gb := &model.MeasureGroupBy{ + TagFamily: family.GetName(), + TagNames: append([]string(nil), family.GetTags()...), + } + if validateErr := validateGroupByTags(measureSchema, gb); validateErr != nil { + return nil, nil, validateErr + } + + aggProto := req.GetAgg() + if validateErr := validateAggField(measureSchema, aggProto.GetFieldName()); validateErr != nil { + return nil, nil, validateErr + } + agg := &model.MeasureAgg{ + FieldName: aggProto.GetFieldName(), + Func: aggProto.GetFunction(), + } + return gb, agg, nil +} + +// validateGroupByTags ensures every name in gb.TagNames exists within the +// configured tag family of measureSchema. +func validateGroupByTags(measureSchema *databasev1.Measure, gb *model.MeasureGroupBy) error { + for _, tf := range measureSchema.GetTagFamilies() { + if tf.GetName() != gb.TagFamily { + continue + } + known := make(map[string]struct{}, len(tf.GetTags())) + for _, ts := range tf.GetTags() { + known[ts.GetName()] = struct{}{} + } + for _, name := range gb.TagNames { + if _, ok := known[name]; !ok { + return fmt.Errorf("plan.Analyze: GroupBy tag %s.%s not present in measure schema", gb.TagFamily, name) + } + } + return nil + } + return fmt.Errorf("plan.Analyze: GroupBy tag family %q not present in measure schema", gb.TagFamily) +} + +// validateAggField ensures the agg field name is a field defined on the +// Measure schema. Type compatibility (int/float) is enforced later by the +// BatchAggregation operator. +func validateAggField(measureSchema *databasev1.Measure, fieldName string) error { + for _, fs := range measureSchema.GetFields() { + if fs.GetName() == fieldName { + return nil + } + } + return fmt.Errorf("plan.Analyze: Agg field %q not present in measure schema", fieldName) +} diff --git a/pkg/query/vectorized/measure/plan/analyzer_test.go b/pkg/query/vectorized/measure/plan/analyzer_test.go new file mode 100644 index 000000000..1605a3399 --- /dev/null +++ b/pkg/query/vectorized/measure/plan/analyzer_test.go @@ -0,0 +1,291 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package plan + +import ( + "strings" + "testing" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" +) + +// testMeasureSchema builds a minimal Measure schema with one "default" tag +// family containing "svc" + "region" tag specs and one "value" field. +func testMeasureSchema() *databasev1.Measure { + return &databasev1.Measure{ + Metadata: &commonv1.Metadata{Name: "demo", Group: "default"}, + TagFamilies: []*databasev1.TagFamilySpec{ + { + Name: "default", + Tags: []*databasev1.TagSpec{ + {Name: "svc", Type: databasev1.TagType_TAG_TYPE_STRING}, + {Name: "region", Type: databasev1.TagType_TAG_TYPE_STRING}, + }, + }, + }, + Fields: []*databasev1.FieldSpec{ + {Name: "value", FieldType: databasev1.FieldType_FIELD_TYPE_INT}, + }, + } +} + +func projTagProj() *modelv1.TagProjection { + return &modelv1.TagProjection{TagFamilies: []*modelv1.TagProjection_TagFamily{ + {Name: "default", Tags: []string{"svc"}}, + }} +} + +func TestAnalyze_BareRequest_BuildsScanWrappedInLimit(t *testing.T) { + req := &measurev1.QueryRequest{ + Name: "demo", + TagProjection: projTagProj(), + FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{"value"}}, + } + p, err := Analyze(req, testMeasureSchema()) + if err != nil { + t.Fatalf("Analyze: %v", err) + } + if _, ok := p.(*Limit); !ok { + t.Fatalf("root should be *Limit, got %T", p) + } + if _, ok := p.Children()[0].(*Scan); !ok { + t.Fatalf("Limit child should be *Scan, got %T", p.Children()[0]) + } +} + +func TestAnalyze_GroupByAgg_BuildsGroupByAggBelowLimit(t *testing.T) { + req := &measurev1.QueryRequest{ + Name: "demo", + TagProjection: projTagProj(), + FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{"value"}}, + GroupBy: &measurev1.QueryRequest_GroupBy{ + TagProjection: projTagProj(), + FieldName: "value", + }, + Agg: &measurev1.QueryRequest_Aggregation{ + Function: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, + FieldName: "value", + }, + } + p, err := Analyze(req, testMeasureSchema()) + if err != nil { + t.Fatalf("Analyze: %v", err) + } + limit, ok := p.(*Limit) + if !ok { + t.Fatalf("root should be *Limit, got %T", p) + } + gba, ok := limit.Child.(*GroupByAgg) + if !ok { + t.Fatalf("Limit child should be *GroupByAgg, got %T", limit.Child) + } + if _, ok := gba.Children()[0].(*Scan); !ok { + t.Fatalf("GroupByAgg child should be *Scan, got %T", gba.Children()[0]) + } + if gba.GroupBy.TagNames[0] != "svc" { + t.Fatalf("GroupBy.TagNames: want [svc], got %v", gba.GroupBy.TagNames) + } + if gba.Agg.FieldName != "value" { + t.Fatalf("Agg.FieldName: want value, got %s", gba.Agg.FieldName) + } +} + +func TestAnalyze_TopBetweenGroupByAggAndLimit(t *testing.T) { + req := &measurev1.QueryRequest{ + Name: "demo", + TagProjection: projTagProj(), + FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{"value"}}, + GroupBy: &measurev1.QueryRequest_GroupBy{ + TagProjection: projTagProj(), + FieldName: "value", + }, + Agg: &measurev1.QueryRequest_Aggregation{ + Function: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, + FieldName: "value", + }, + Top: &measurev1.QueryRequest_Top{ + Number: 5, + FieldName: "value_sum", + FieldValueSort: modelv1.Sort_SORT_DESC, + }, + } + p, err := Analyze(req, testMeasureSchema()) + if err != nil { + t.Fatalf("Analyze: %v", err) + } + limit, ok := p.(*Limit) + if !ok { + t.Fatalf("root: want *Limit, got %T", p) + } + top, ok := limit.Child.(*Top) + if !ok { + t.Fatalf("Limit child: want *Top, got %T", limit.Child) + } + if _, ok := top.Child.(*GroupByAgg); !ok { + t.Fatalf("Top child: want *GroupByAgg, got %T", top.Child) + } +} + +func TestAnalyze_GroupByWithoutAgg_Errors(t *testing.T) { + req := &measurev1.QueryRequest{ + Name: "demo", + TagProjection: projTagProj(), + FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{"value"}}, + GroupBy: &measurev1.QueryRequest_GroupBy{ + TagProjection: projTagProj(), + FieldName: "value", + }, + } + _, err := Analyze(req, testMeasureSchema()) + if err == nil { + t.Fatal("GroupBy without Agg must error") + } + if !strings.Contains(err.Error(), "Agg") { + t.Fatalf("error should mention Agg, got %v", err) + } +} + +func TestAnalyze_AggWithoutGroupBy_Errors(t *testing.T) { + req := &measurev1.QueryRequest{ + Name: "demo", + TagProjection: projTagProj(), + FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{"value"}}, + Agg: &measurev1.QueryRequest_Aggregation{ + Function: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, + FieldName: "value", + }, + } + _, err := Analyze(req, testMeasureSchema()) + if err == nil { + t.Fatal("Agg without GroupBy must error (scalar reduce not supported)") + } +} + +func TestAnalyze_UnknownGroupByTag_Errors(t *testing.T) { + req := &measurev1.QueryRequest{ + Name: "demo", + TagProjection: projTagProj(), + FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{"value"}}, + GroupBy: &measurev1.QueryRequest_GroupBy{ + TagProjection: &modelv1.TagProjection{TagFamilies: []*modelv1.TagProjection_TagFamily{ + {Name: "default", Tags: []string{"missing"}}, + }}, + FieldName: "value", + }, + Agg: &measurev1.QueryRequest_Aggregation{ + Function: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, + FieldName: "value", + }, + } + _, err := Analyze(req, testMeasureSchema()) + if err == nil { + t.Fatal("unknown groupby tag must error") + } + if !strings.Contains(err.Error(), "missing") { + t.Fatalf("error should mention the missing tag, got %v", err) + } +} + +func TestAnalyze_UnknownAggField_Errors(t *testing.T) { + req := &measurev1.QueryRequest{ + Name: "demo", + TagProjection: projTagProj(), + FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{"value"}}, + GroupBy: &measurev1.QueryRequest_GroupBy{ + TagProjection: projTagProj(), + FieldName: "value", + }, + Agg: &measurev1.QueryRequest_Aggregation{ + Function: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, + FieldName: "ghost", + }, + } + _, err := Analyze(req, testMeasureSchema()) + if err == nil { + t.Fatal("unknown agg field must error") + } + if !strings.Contains(err.Error(), "ghost") { + t.Fatalf("error should mention the missing field, got %v", err) + } +} + +func TestAnalyze_NilRequest_Errors(t *testing.T) { + _, err := Analyze(nil, testMeasureSchema()) + if err == nil { + t.Fatal("nil request must error") + } +} + +func TestAnalyze_NilSchema_Errors(t *testing.T) { + req := &measurev1.QueryRequest{Name: "demo"} + _, err := Analyze(req, nil) + if err == nil { + t.Fatal("nil schema must error") + } +} + +func TestAnalyze_DefaultLimit_AppliedWhenZero(t *testing.T) { + req := &measurev1.QueryRequest{ + Name: "demo", + TagProjection: projTagProj(), + FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{"value"}}, + // Limit unset (0) → default 100 per defaultLimit constant + } + p, err := Analyze(req, testMeasureSchema()) + if err != nil { + t.Fatalf("Analyze: %v", err) + } + limit := p.(*Limit) + if limit.N != defaultLimit { + t.Fatalf("default limit: want %d, got %d", defaultLimit, limit.N) + } +} + +func TestPrintTree_RendersHierarchy(t *testing.T) { + req := &measurev1.QueryRequest{ + Name: "demo", + TagProjection: projTagProj(), + FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{"value"}}, + GroupBy: &measurev1.QueryRequest_GroupBy{ + TagProjection: projTagProj(), + FieldName: "value", + }, + Agg: &measurev1.QueryRequest_Aggregation{ + Function: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, + FieldName: "value", + }, + } + p, err := Analyze(req, testMeasureSchema()) + if err != nil { + t.Fatal(err) + } + out := PrintTree(p) + // Three lines: Limit / GroupByAgg / Scan, each at increasing indent. + if !strings.Contains(out, "Limit(") { + t.Fatalf("PrintTree must include Limit, got: %s", out) + } + if !strings.Contains(out, " GroupByAgg(") { + t.Fatalf("PrintTree must include indented GroupByAgg, got: %s", out) + } + if !strings.Contains(out, " Scan(") { + t.Fatalf("PrintTree must include double-indented Scan, got: %s", out) + } +}
