This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 89900e66112a31ed6a682356b416b9b07e8e9888
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 13 09:57:00 2026 +0000

    feat(query/vectorized/measure/plan): vec analyzer (G8b)
    
    Analyze(req *measurev1.QueryRequest, schema *databasev1.Measure)
    (VecPlan, error) translates a measure query into the vec plan tree
    introduced in G8a. This is the vec counterpart of the deprecated
    pkg/query/logical/measure.Analyze — it produces vec plan nodes
    directly, with no logical→physical resolution step and no leaf
    substitution into a row plan.
    
    Plan shape:
    
        Limit
          └─ (Top?)
              └─ (GroupByAgg if group_by + agg both set)
                  └─ Scan
    
    Coverage:
    
    - Static query parameters (Measure schema, time range, tag projection,
      field projection) populate Scan.Params at analyze time.
    - Runtime parameters that depend on the executor's MeasureExecutionContext
      (the resolved index.Query and the entity table) are left blank;
      the executor (G8c) fills them in immediately before invoking Build.
    - GroupBy + Agg are coalesced into a single GroupByAgg node — they
      pair to one BatchAggregation operator per G7d's planner.
    - Validation matches the G7d contract: GroupBy and Agg must come
      together; tag and field references must exist in the Measure schema;
      GroupBy tag_projection v1 supports a single tag family.
    - Top sits between GroupByAgg and Limit so the heap operates on
      aggregated rows when GroupBy+Agg is present.
    
    Tests cover every plan shape (bare Scan/Limit, GroupByAgg, Top, all
    three together), every validation error (GroupBy without Agg, Agg
    without GroupBy, unknown groupby tag, unknown agg field, nil request,
    nil schema), the default-limit fallback when QueryRequest.Limit is zero,
    and PrintTree rendering for debugging.
---
 pkg/query/vectorized/measure/plan/analyzer.go      | 209 +++++++++++++++
 pkg/query/vectorized/measure/plan/analyzer_test.go | 291 +++++++++++++++++++++
 2 files changed, 500 insertions(+)

diff --git a/pkg/query/vectorized/measure/plan/analyzer.go 
b/pkg/query/vectorized/measure/plan/analyzer.go
new file mode 100644
index 000000000..69632f5ea
--- /dev/null
+++ b/pkg/query/vectorized/measure/plan/analyzer.go
@@ -0,0 +1,209 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plan
+
+import (
+       "fmt"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       measure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// defaultLimit mirrors the row-path measure_analyzer.go default. Matched so
+// vec and row produce identical paging for requests that don't set Limit.
+const defaultLimit uint32 = 100
+
+// Analyze translates a measurev1.QueryRequest + its Measure schema into a
+// VecPlan tree. It is the vec counterpart of pkg/query/logical/measure
+// (deprecated) but produces vec plan nodes — there is no leaf
+// substitution into a row plan.
+//
+// The returned tree is structural: it carries the static (proto-derived)
+// query parameters in `Scan.Params` and the BatchSchema for downstream
+// nodes to consult. Runtime fields that depend on the executor's
+// MeasureExecutionContext — the resolved `index.Query` and the entity
+// table — are NOT populated here; the executor (G8c) fills them in
+// before invoking Build.
+//
+// Errors are returned for:
+//   - nil schema
+//   - tag/field projection naming columns not in the schema
+//   - GroupBy referencing a tag absent from the schema
+//   - Agg referencing a field absent from the schema
+//   - GroupBy set without Agg (raw groupby not supported in v1)
+//   - Agg set without GroupBy (scalar reduce not supported in v1)
+func Analyze(req *measurev1.QueryRequest, measureSchema *databasev1.Measure) 
(VecPlan, error) {
+       if req == nil {
+               return nil, fmt.Errorf("plan.Analyze: nil request")
+       }
+       if measureSchema == nil {
+               return nil, fmt.Errorf("plan.Analyze: nil Measure schema")
+       }
+
+       tagProjection := buildTagProjection(req)
+       fieldProjection := req.GetFieldProjection().GetNames()
+       opts := model.MeasureQueryOptions{
+               TagProjection:   tagProjection,
+               FieldProjection: fieldProjection,
+       }
+       batchSchema, schemaErr := measure.BuildBatchSchema(measureSchema, opts)
+       if schemaErr != nil {
+               return nil, fmt.Errorf("plan.Analyze: %w", schemaErr)
+       }
+
+       var tr *timestamp.TimeRange
+       if t := req.GetTimeRange(); t != nil {
+               r := timestamp.NewInclusiveTimeRange(t.GetBegin().AsTime(), 
t.GetEnd().AsTime())
+               tr = &r
+       }
+
+       var plan VecPlan = NewScan(batchSchema, ScanParams{
+               Measure:         measureSchema,
+               TimeRange:       tr,
+               TagProjection:   tagProjection,
+               FieldProjection: fieldProjection,
+       })
+
+       // GroupBy + Agg coalesced into a single GroupByAgg node. Validation
+       // matches the G7d planner's contract.
+       hasGroupBy := req.GetGroupBy() != nil
+       hasAgg := req.GetAgg() != nil
+       switch {
+       case hasGroupBy && hasAgg:
+               gb, aggSpec, validateErr := translateGroupByAgg(req, 
measureSchema)
+               if validateErr != nil {
+                       return nil, validateErr
+               }
+               gba, gbaErr := NewGroupByAgg(plan, gb, aggSpec)
+               if gbaErr != nil {
+                       return nil, gbaErr
+               }
+               plan = gba
+       case hasGroupBy:
+               return nil, fmt.Errorf("plan.Analyze: GroupBy without Agg is 
not supported in v1")
+       case hasAgg:
+               return nil, fmt.Errorf("plan.Analyze: Agg without GroupBy 
(scalar reduce) is not supported in v1")
+       }
+
+       if t := req.GetTop(); t != nil {
+               asc := t.GetFieldValueSort() == 1 // SORT_ASC == 1 in 
modelv1.Sort
+               plan = NewTop(plan, t.GetFieldName(), int(t.GetNumber()), asc)
+       }
+
+       limitN := req.GetLimit()
+       if limitN == 0 {
+               limitN = defaultLimit
+       }
+       plan = NewLimit(plan, req.GetOffset(), limitN)
+
+       return plan, nil
+}
+
+// buildTagProjection converts the proto tag projection into the model-level
+// slice the BatchSchema builder consumes.
+func buildTagProjection(req *measurev1.QueryRequest) []model.TagProjection {
+       tp := req.GetTagProjection()
+       if tp == nil {
+               return nil
+       }
+       families := tp.GetTagFamilies()
+       out := make([]model.TagProjection, 0, len(families))
+       for _, tf := range families {
+               out = append(out, model.TagProjection{
+                       Family: tf.GetName(),
+                       Names:  append([]string(nil), tf.GetTags()...),
+               })
+       }
+       return out
+}
+
+// translateGroupByAgg builds the model GroupBy/Agg structs from the proto,
+// validating that:
+//   - the GroupBy tag_projection names exactly one family with non-empty tags
+//     (v1 single-family limitation)
+//   - the named GroupBy tags exist in the Measure schema
+//   - the Agg field exists in the Measure schema
+func translateGroupByAgg(req *measurev1.QueryRequest, measureSchema 
*databasev1.Measure) (
+       *model.MeasureGroupBy, *model.MeasureAgg, error,
+) {
+       gbProto := req.GetGroupBy()
+       families := gbProto.GetTagProjection().GetTagFamilies()
+       if len(families) == 0 {
+               return nil, nil, fmt.Errorf("plan.Analyze: 
GroupBy.tag_projection must list at least one tag family")
+       }
+       if len(families) > 1 {
+               return nil, nil, fmt.Errorf("plan.Analyze: 
GroupBy.tag_projection v1 supports a single tag family, got %d", len(families))
+       }
+       family := families[0]
+       if len(family.GetTags()) == 0 {
+               return nil, nil, fmt.Errorf("plan.Analyze: 
GroupBy.tag_projection family %q has no tags", family.GetName())
+       }
+       gb := &model.MeasureGroupBy{
+               TagFamily: family.GetName(),
+               TagNames:  append([]string(nil), family.GetTags()...),
+       }
+       if validateErr := validateGroupByTags(measureSchema, gb); validateErr 
!= nil {
+               return nil, nil, validateErr
+       }
+
+       aggProto := req.GetAgg()
+       if validateErr := validateAggField(measureSchema, 
aggProto.GetFieldName()); validateErr != nil {
+               return nil, nil, validateErr
+       }
+       agg := &model.MeasureAgg{
+               FieldName: aggProto.GetFieldName(),
+               Func:      aggProto.GetFunction(),
+       }
+       return gb, agg, nil
+}
+
+// validateGroupByTags ensures every name in gb.TagNames exists within the
+// configured tag family of measureSchema.
+func validateGroupByTags(measureSchema *databasev1.Measure, gb 
*model.MeasureGroupBy) error {
+       for _, tf := range measureSchema.GetTagFamilies() {
+               if tf.GetName() != gb.TagFamily {
+                       continue
+               }
+               known := make(map[string]struct{}, len(tf.GetTags()))
+               for _, ts := range tf.GetTags() {
+                       known[ts.GetName()] = struct{}{}
+               }
+               for _, name := range gb.TagNames {
+                       if _, ok := known[name]; !ok {
+                               return fmt.Errorf("plan.Analyze: GroupBy tag 
%s.%s not present in measure schema", gb.TagFamily, name)
+                       }
+               }
+               return nil
+       }
+       return fmt.Errorf("plan.Analyze: GroupBy tag family %q not present in 
measure schema", gb.TagFamily)
+}
+
+// validateAggField ensures the agg field name is a field defined on the
+// Measure schema. Type compatibility (int/float) is enforced later by the
+// BatchAggregation operator.
+func validateAggField(measureSchema *databasev1.Measure, fieldName string) 
error {
+       for _, fs := range measureSchema.GetFields() {
+               if fs.GetName() == fieldName {
+                       return nil
+               }
+       }
+       return fmt.Errorf("plan.Analyze: Agg field %q not present in measure 
schema", fieldName)
+}
diff --git a/pkg/query/vectorized/measure/plan/analyzer_test.go 
b/pkg/query/vectorized/measure/plan/analyzer_test.go
new file mode 100644
index 000000000..1605a3399
--- /dev/null
+++ b/pkg/query/vectorized/measure/plan/analyzer_test.go
@@ -0,0 +1,291 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plan
+
+import (
+       "strings"
+       "testing"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+)
+
+// testMeasureSchema builds a minimal Measure schema with one "default" tag
+// family containing "svc" + "region" tag specs and one "value" field.
+func testMeasureSchema() *databasev1.Measure {
+       return &databasev1.Measure{
+               Metadata: &commonv1.Metadata{Name: "demo", Group: "default"},
+               TagFamilies: []*databasev1.TagFamilySpec{
+                       {
+                               Name: "default",
+                               Tags: []*databasev1.TagSpec{
+                                       {Name: "svc", Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                                       {Name: "region", Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                               },
+                       },
+               },
+               Fields: []*databasev1.FieldSpec{
+                       {Name: "value", FieldType: 
databasev1.FieldType_FIELD_TYPE_INT},
+               },
+       }
+}
+
+func projTagProj() *modelv1.TagProjection {
+       return &modelv1.TagProjection{TagFamilies: 
[]*modelv1.TagProjection_TagFamily{
+               {Name: "default", Tags: []string{"svc"}},
+       }}
+}
+
+func TestAnalyze_BareRequest_BuildsScanWrappedInLimit(t *testing.T) {
+       req := &measurev1.QueryRequest{
+               Name:            "demo",
+               TagProjection:   projTagProj(),
+               FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{"value"}},
+       }
+       p, err := Analyze(req, testMeasureSchema())
+       if err != nil {
+               t.Fatalf("Analyze: %v", err)
+       }
+       if _, ok := p.(*Limit); !ok {
+               t.Fatalf("root should be *Limit, got %T", p)
+       }
+       if _, ok := p.Children()[0].(*Scan); !ok {
+               t.Fatalf("Limit child should be *Scan, got %T", p.Children()[0])
+       }
+}
+
+func TestAnalyze_GroupByAgg_BuildsGroupByAggBelowLimit(t *testing.T) {
+       req := &measurev1.QueryRequest{
+               Name:            "demo",
+               TagProjection:   projTagProj(),
+               FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{"value"}},
+               GroupBy: &measurev1.QueryRequest_GroupBy{
+                       TagProjection: projTagProj(),
+                       FieldName:     "value",
+               },
+               Agg: &measurev1.QueryRequest_Aggregation{
+                       Function:  
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
+                       FieldName: "value",
+               },
+       }
+       p, err := Analyze(req, testMeasureSchema())
+       if err != nil {
+               t.Fatalf("Analyze: %v", err)
+       }
+       limit, ok := p.(*Limit)
+       if !ok {
+               t.Fatalf("root should be *Limit, got %T", p)
+       }
+       gba, ok := limit.Child.(*GroupByAgg)
+       if !ok {
+               t.Fatalf("Limit child should be *GroupByAgg, got %T", 
limit.Child)
+       }
+       if _, ok := gba.Children()[0].(*Scan); !ok {
+               t.Fatalf("GroupByAgg child should be *Scan, got %T", 
gba.Children()[0])
+       }
+       if gba.GroupBy.TagNames[0] != "svc" {
+               t.Fatalf("GroupBy.TagNames: want [svc], got %v", 
gba.GroupBy.TagNames)
+       }
+       if gba.Agg.FieldName != "value" {
+               t.Fatalf("Agg.FieldName: want value, got %s", gba.Agg.FieldName)
+       }
+}
+
+func TestAnalyze_TopBetweenGroupByAggAndLimit(t *testing.T) {
+       req := &measurev1.QueryRequest{
+               Name:            "demo",
+               TagProjection:   projTagProj(),
+               FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{"value"}},
+               GroupBy: &measurev1.QueryRequest_GroupBy{
+                       TagProjection: projTagProj(),
+                       FieldName:     "value",
+               },
+               Agg: &measurev1.QueryRequest_Aggregation{
+                       Function:  
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
+                       FieldName: "value",
+               },
+               Top: &measurev1.QueryRequest_Top{
+                       Number:         5,
+                       FieldName:      "value_sum",
+                       FieldValueSort: modelv1.Sort_SORT_DESC,
+               },
+       }
+       p, err := Analyze(req, testMeasureSchema())
+       if err != nil {
+               t.Fatalf("Analyze: %v", err)
+       }
+       limit, ok := p.(*Limit)
+       if !ok {
+               t.Fatalf("root: want *Limit, got %T", p)
+       }
+       top, ok := limit.Child.(*Top)
+       if !ok {
+               t.Fatalf("Limit child: want *Top, got %T", limit.Child)
+       }
+       if _, ok := top.Child.(*GroupByAgg); !ok {
+               t.Fatalf("Top child: want *GroupByAgg, got %T", top.Child)
+       }
+}
+
+func TestAnalyze_GroupByWithoutAgg_Errors(t *testing.T) {
+       req := &measurev1.QueryRequest{
+               Name:            "demo",
+               TagProjection:   projTagProj(),
+               FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{"value"}},
+               GroupBy: &measurev1.QueryRequest_GroupBy{
+                       TagProjection: projTagProj(),
+                       FieldName:     "value",
+               },
+       }
+       _, err := Analyze(req, testMeasureSchema())
+       if err == nil {
+               t.Fatal("GroupBy without Agg must error")
+       }
+       if !strings.Contains(err.Error(), "Agg") {
+               t.Fatalf("error should mention Agg, got %v", err)
+       }
+}
+
+func TestAnalyze_AggWithoutGroupBy_Errors(t *testing.T) {
+       req := &measurev1.QueryRequest{
+               Name:            "demo",
+               TagProjection:   projTagProj(),
+               FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{"value"}},
+               Agg: &measurev1.QueryRequest_Aggregation{
+                       Function:  
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
+                       FieldName: "value",
+               },
+       }
+       _, err := Analyze(req, testMeasureSchema())
+       if err == nil {
+               t.Fatal("Agg without GroupBy must error (scalar reduce not 
supported)")
+       }
+}
+
+func TestAnalyze_UnknownGroupByTag_Errors(t *testing.T) {
+       req := &measurev1.QueryRequest{
+               Name:            "demo",
+               TagProjection:   projTagProj(),
+               FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{"value"}},
+               GroupBy: &measurev1.QueryRequest_GroupBy{
+                       TagProjection: &modelv1.TagProjection{TagFamilies: 
[]*modelv1.TagProjection_TagFamily{
+                               {Name: "default", Tags: []string{"missing"}},
+                       }},
+                       FieldName: "value",
+               },
+               Agg: &measurev1.QueryRequest_Aggregation{
+                       Function:  
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
+                       FieldName: "value",
+               },
+       }
+       _, err := Analyze(req, testMeasureSchema())
+       if err == nil {
+               t.Fatal("unknown groupby tag must error")
+       }
+       if !strings.Contains(err.Error(), "missing") {
+               t.Fatalf("error should mention the missing tag, got %v", err)
+       }
+}
+
+func TestAnalyze_UnknownAggField_Errors(t *testing.T) {
+       req := &measurev1.QueryRequest{
+               Name:            "demo",
+               TagProjection:   projTagProj(),
+               FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{"value"}},
+               GroupBy: &measurev1.QueryRequest_GroupBy{
+                       TagProjection: projTagProj(),
+                       FieldName:     "value",
+               },
+               Agg: &measurev1.QueryRequest_Aggregation{
+                       Function:  
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
+                       FieldName: "ghost",
+               },
+       }
+       _, err := Analyze(req, testMeasureSchema())
+       if err == nil {
+               t.Fatal("unknown agg field must error")
+       }
+       if !strings.Contains(err.Error(), "ghost") {
+               t.Fatalf("error should mention the missing field, got %v", err)
+       }
+}
+
+func TestAnalyze_NilRequest_Errors(t *testing.T) {
+       _, err := Analyze(nil, testMeasureSchema())
+       if err == nil {
+               t.Fatal("nil request must error")
+       }
+}
+
+func TestAnalyze_NilSchema_Errors(t *testing.T) {
+       req := &measurev1.QueryRequest{Name: "demo"}
+       _, err := Analyze(req, nil)
+       if err == nil {
+               t.Fatal("nil schema must error")
+       }
+}
+
+func TestAnalyze_DefaultLimit_AppliedWhenZero(t *testing.T) {
+       req := &measurev1.QueryRequest{
+               Name:            "demo",
+               TagProjection:   projTagProj(),
+               FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{"value"}},
+               // Limit unset (0) → default 100 per defaultLimit constant
+       }
+       p, err := Analyze(req, testMeasureSchema())
+       if err != nil {
+               t.Fatalf("Analyze: %v", err)
+       }
+       limit := p.(*Limit)
+       if limit.N != defaultLimit {
+               t.Fatalf("default limit: want %d, got %d", defaultLimit, 
limit.N)
+       }
+}
+
+func TestPrintTree_RendersHierarchy(t *testing.T) {
+       req := &measurev1.QueryRequest{
+               Name:            "demo",
+               TagProjection:   projTagProj(),
+               FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{"value"}},
+               GroupBy: &measurev1.QueryRequest_GroupBy{
+                       TagProjection: projTagProj(),
+                       FieldName:     "value",
+               },
+               Agg: &measurev1.QueryRequest_Aggregation{
+                       Function:  
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
+                       FieldName: "value",
+               },
+       }
+       p, err := Analyze(req, testMeasureSchema())
+       if err != nil {
+               t.Fatal(err)
+       }
+       out := PrintTree(p)
+       // Three lines: Limit / GroupByAgg / Scan, each at increasing indent.
+       if !strings.Contains(out, "Limit(") {
+               t.Fatalf("PrintTree must include Limit, got: %s", out)
+       }
+       if !strings.Contains(out, "  GroupByAgg(") {
+               t.Fatalf("PrintTree must include indented GroupByAgg, got: %s", 
out)
+       }
+       if !strings.Contains(out, "    Scan(") {
+               t.Fatalf("PrintTree must include double-indented Scan, got: 
%s", out)
+       }
+}

Reply via email to