This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit bd9f9a08cadeae8342652ce6cafc1b0d2a2fb42a
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 13 09:56:44 2026 +0000

    feat(query/vectorized/measure/plan): VecPlan tree scaffolding (G8a)
    
    New package pkg/query/vectorized/measure/plan defines the vectorized
    measure-query plan tree — a peer of pkg/query/logical/measure (deprecated,
    row path) that shares no plan node types, executor, or iterator
    machinery. Per the G8 directive, top-level dispatch routes to one OR the
    other based on VectorizedConfig.Enabled; no leaf substitution into the
    row plan.
    
    This commit lands the scaffolding:
    
    - VecPlan interface: Schema, Children, Build(ctx, *BuildContext), String.
    - BuildContext: threads the shared PipelineBuilder, MemoryTracker (G7a),
      and VectorizedConfig through every node's Build call.
    - Scan: leaf node; carries the analyzed schema and the executor-supplied
      PullOperator source. Build attaches the source via Builder.From.
    - Limit: fusible node wrapping BatchLimit; schema-preserving.
    - Top: breaker node wrapping BatchTop; schema-preserving. Reuses the
      existing single-heap BatchTop semantics — per-timestamp partitioning is
      tracked separately.
    - GroupByAgg: breaker node that delegates operator construction to G7d's
      BuildOperators, which today emits a single BatchAggregation that does
      both grouping and folding. Schema-rewriting: drops timestamp and adds
      the agg result field (the auto-derived `<field>_<func>` column name).
    - PrintTree: helper to render a tree as multi-line indented text for
      debugging.
    
    Tests (build_test.go) drive each node against a fake PullOperator,
    verifying:
    - Scan + Limit emits source batches unchanged when N exceeds the row
      count.
    - Scan + GroupByAgg + Limit aggregates by key correctly (svc tag, SUM
      value).
    - Scan with unset Source surfaces a programmer-error message.
    - GroupByAgg.Schema drops timestamp and adds the agg result column with
      the expected output name.
---
 pkg/query/vectorized/measure/plan/build_test.go  | 231 +++++++++++++++++++++++
 pkg/query/vectorized/measure/plan/groupby_agg.go | 124 ++++++++++++
 pkg/query/vectorized/measure/plan/limit.go       |  76 ++++++++
 pkg/query/vectorized/measure/plan/plan.go        | 100 ++++++++++
 pkg/query/vectorized/measure/plan/scan.go        |  87 +++++++++
 pkg/query/vectorized/measure/plan/top.go         |  90 +++++++++
 6 files changed, 708 insertions(+)

diff --git a/pkg/query/vectorized/measure/plan/build_test.go 
b/pkg/query/vectorized/measure/plan/build_test.go
new file mode 100644
index 000000000..e04a33d82
--- /dev/null
+++ b/pkg/query/vectorized/measure/plan/build_test.go
@@ -0,0 +1,231 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plan
+
+import (
+       "context"
+       "testing"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+       measure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
+)
+
+// fakePullSource emits a fixed sequence of RecordBatches then EOF.
+type fakePullSource struct {
+       schema  *vectorized.BatchSchema
+       batches []*vectorized.RecordBatch
+       idx     int
+}
+
+func (f *fakePullSource) Init(_ context.Context) error          { return nil }
+func (f *fakePullSource) OutputSchema() *vectorized.BatchSchema { return 
f.schema }
+func (f *fakePullSource) Close() error                          { return nil }
+func (f *fakePullSource) NextBatch(_ context.Context) 
(*vectorized.RecordBatch, error) {
+       if f.idx >= len(f.batches) {
+               return nil, nil
+       }
+       b := f.batches[f.idx]
+       f.idx++
+       return b, nil
+}
+
+// buildScanInput constructs a BatchSchema + RecordBatch shaped like a Scan
+// output: (timestamp, version, sid, shardID) + (svc tag) + (value field).
+// The aggregation pipeline projects svc + value out of this layout.
+func buildScanInput(t *testing.T) (*vectorized.BatchSchema, 
*vectorized.RecordBatch) {
+       t.Helper()
+       schema := vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleTimestamp, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleVersion, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleSeriesID, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleShardID, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", 
Type: vectorized.ColumnTypeString},
+               {Role: vectorized.RoleField, Name: "value", Type: 
vectorized.ColumnTypeInt64},
+       })
+       b := vectorized.NewRecordBatch(schema, 5)
+       pushRow := func(ts int64, svc string, v int64) {
+               b.Columns[0].(*vectorized.TypedColumn[int64]).Append(ts)
+               b.Columns[1].(*vectorized.TypedColumn[int64]).Append(1)
+               b.Columns[2].(*vectorized.TypedColumn[int64]).Append(1)
+               b.Columns[3].(*vectorized.TypedColumn[int64]).Append(0)
+               b.Columns[4].(*vectorized.TypedColumn[string]).Append(svc)
+               b.Columns[5].(*vectorized.TypedColumn[int64]).Append(v)
+       }
+       pushRow(1, "a", 1)
+       pushRow(2, "b", 4)
+       pushRow(3, "a", 2)
+       pushRow(4, "a", 3)
+       pushRow(5, "b", 5)
+       b.Len = 5
+       return schema, b
+}
+
+// drainPipeline pulls every batch from the pipeline and returns the slice
+// of returned batches (caller must not modify them after this call).
+func drainPipeline(t *testing.T, p *vectorized.Pipeline) 
[]*vectorized.RecordBatch {
+       t.Helper()
+       var out []*vectorized.RecordBatch
+       for {
+               b, err := p.Next(context.Background())
+               if err != nil {
+                       t.Fatalf("pipeline Next: %v", err)
+               }
+               if b == nil {
+                       break
+               }
+               out = append(out, b)
+       }
+       return out
+}
+
+func TestScanLimit_Build_EmitsSourceBatchesUnchanged(t *testing.T) {
+       schema, batch := buildScanInput(t)
+       src := &fakePullSource{schema: schema, batches: 
[]*vectorized.RecordBatch{batch}}
+
+       scan := NewScan(schema, ScanParams{})
+       scan.Source = src
+       root := NewLimit(scan, 0, 10) // limit > total → no rows dropped
+
+       tracker := vectorized.NewMemoryTracker(1 << 20)
+       bc := &BuildContext{
+               Builder: 
vectorized.NewPipelineBuilder().WithMemoryTracker(tracker),
+               Tracker: tracker,
+               Config:  measure.VectorizedConfig{BatchSize: 1024, 
QueryMemoryMiB: 1},
+       }
+       if buildErr := root.Build(context.Background(), bc); buildErr != nil {
+               t.Fatalf("Build: %v", buildErr)
+       }
+       pipeline, err := bc.Builder.Build()
+       if err != nil {
+               t.Fatalf("PipelineBuilder.Build: %v", err)
+       }
+       if initErr := pipeline.Init(context.Background()); initErr != nil {
+               t.Fatal(initErr)
+       }
+       batches := drainPipeline(t, pipeline)
+       if len(batches) != 1 {
+               t.Fatalf("want 1 batch, got %d", len(batches))
+       }
+       if batches[0].Len != 5 {
+               t.Fatalf("want 5 rows, got %d", batches[0].Len)
+       }
+}
+
+func TestScanGroupByAggLimit_Build_AggregatesByKey(t *testing.T) {
+       schema, batch := buildScanInput(t)
+       src := &fakePullSource{schema: schema, batches: 
[]*vectorized.RecordBatch{batch}}
+
+       scan := NewScan(schema, ScanParams{})
+       scan.Source = src
+       gba, err := NewGroupByAgg(scan,
+               &model.MeasureGroupBy{TagFamily: "default", TagNames: 
[]string{"svc"}},
+               &model.MeasureAgg{FieldName: "value", Func: 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM},
+       )
+       if err != nil {
+               t.Fatalf("NewGroupByAgg: %v", err)
+       }
+       root := NewLimit(gba, 0, 10)
+
+       tracker := vectorized.NewMemoryTracker(1 << 20)
+       bc := &BuildContext{
+               Builder: 
vectorized.NewPipelineBuilder().WithMemoryTracker(tracker),
+               Tracker: tracker,
+               Config:  measure.VectorizedConfig{BatchSize: 1024, 
QueryMemoryMiB: 1},
+       }
+       if buildErr := root.Build(context.Background(), bc); buildErr != nil {
+               t.Fatalf("Build: %v", buildErr)
+       }
+       pipeline, buildErr := bc.Builder.Build()
+       if buildErr != nil {
+               t.Fatalf("PipelineBuilder.Build: %v", buildErr)
+       }
+       if initErr := pipeline.Init(context.Background()); initErr != nil {
+               t.Fatal(initErr)
+       }
+       batches := drainPipeline(t, pipeline)
+       totalRows := 0
+       for _, b := range batches {
+               totalRows += b.Len
+       }
+       if totalRows != 2 {
+               t.Fatalf("want 2 aggregated rows (2 groups), got %d", totalRows)
+       }
+
+       // Verify by-key sums: a=6, b=9.
+       bySvc := map[string]int64{}
+       for _, b := range batches {
+               svcCol := b.Columns[0].(*vectorized.TypedColumn[string])
+               sumCol := b.Columns[1].(*vectorized.TypedColumn[int64])
+               for i := 0; i < b.Len; i++ {
+                       bySvc[svcCol.Data()[i]] = sumCol.Data()[i]
+               }
+       }
+       if bySvc["a"] != 6 {
+               t.Fatalf("sum(a): want 6, got %d", bySvc["a"])
+       }
+       if bySvc["b"] != 9 {
+               t.Fatalf("sum(b): want 9, got %d", bySvc["b"])
+       }
+}
+
+func TestScan_Build_MissingSource_Errors(t *testing.T) {
+       schema, _ := buildScanInput(t)
+       scan := NewScan(schema, ScanParams{}) // Source intentionally unset
+       tracker := vectorized.NewMemoryTracker(1 << 20)
+       bc := &BuildContext{
+               Builder: 
vectorized.NewPipelineBuilder().WithMemoryTracker(tracker),
+               Tracker: tracker,
+               Config:  measure.VectorizedConfig{BatchSize: 1024, 
QueryMemoryMiB: 1},
+       }
+       if err := scan.Build(context.Background(), bc); err == nil {
+               t.Fatal("Build with nil Source must error")
+       }
+}
+
+func TestGroupByAgg_Schema_DropsTimestampAddsAggField(t *testing.T) {
+       schema, _ := buildScanInput(t)
+       src := &fakePullSource{schema: schema}
+       scan := NewScan(schema, ScanParams{})
+       scan.Source = src
+       gba, err := NewGroupByAgg(scan,
+               &model.MeasureGroupBy{TagFamily: "default", TagNames: 
[]string{"svc"}},
+               &model.MeasureAgg{FieldName: "value", Func: 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM},
+       )
+       if err != nil {
+               t.Fatalf("NewGroupByAgg: %v", err)
+       }
+       out := gba.Schema()
+       if out == nil {
+               t.Fatal("GroupByAgg.Schema must not be nil after BuildOperators 
resolves it")
+       }
+       if out.TimestampIndex() >= 0 {
+               t.Fatalf("aggregation output must drop timestamp (D2); got 
index %d", out.TimestampIndex())
+       }
+       // 2 columns: svc key + value_sum agg result.
+       if len(out.Columns) != 2 {
+               t.Fatalf("want 2 output columns (key + agg result), got %d", 
len(out.Columns))
+       }
+       if out.Columns[0].Name != "svc" {
+               t.Fatalf("col 0 should be the svc key, got %s", 
out.Columns[0].Name)
+       }
+       if out.Columns[1].Name != "value_sum" {
+               t.Fatalf("col 1 should be value_sum, got %s", 
out.Columns[1].Name)
+       }
+}
diff --git a/pkg/query/vectorized/measure/plan/groupby_agg.go 
b/pkg/query/vectorized/measure/plan/groupby_agg.go
new file mode 100644
index 000000000..df678440b
--- /dev/null
+++ b/pkg/query/vectorized/measure/plan/groupby_agg.go
@@ -0,0 +1,124 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plan
+
+import (
+       "context"
+       "fmt"
+       "strings"
+
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+       vmeasure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
+)
+
+// GroupByAgg is the v1 vec aggregation node: GroupBy + single Aggregation
+// fused into one BatchAggregation operator (see G7d planner). The
+// operator does its own grouping via keyIndices and folding via aggs.
+//
+// Schema-rewriting: output is key columns + one agg result column. The
+// timestamp column is dropped, so serializeBatchToProto emits
+// DataPoint.Timestamp == nil for every output row (per G7 design
+// decision D2).
+type GroupByAgg struct {
+       Child       VecPlan
+       GroupBy     *model.MeasureGroupBy
+       Agg         *model.MeasureAgg
+       outputCache *vectorized.BatchSchema
+}
+
+// NewGroupByAgg constructs a GroupByAgg node wrapping child. Returns an
+// error if GroupBy or Agg are nil (v1 requires both: scalar reduce and
+// raw groupby are not supported).
+func NewGroupByAgg(child VecPlan, groupBy *model.MeasureGroupBy, agg 
*model.MeasureAgg) (*GroupByAgg, error) {
+       if groupBy == nil {
+               return nil, fmt.Errorf("plan.GroupByAgg: GroupBy must not be 
nil (scalar reduce not supported in v1)")
+       }
+       if agg == nil {
+               return nil, fmt.Errorf("plan.GroupByAgg: Agg must not be nil 
(raw groupby not supported in v1)")
+       }
+       if child == nil {
+               return nil, fmt.Errorf("plan.GroupByAgg: Child must not be nil")
+       }
+       return &GroupByAgg{Child: child, GroupBy: groupBy, Agg: agg}, nil
+}
+
+// Schema returns the aggregation output schema. The schema is computed
+// lazily on first call by running BuildOperators against the child
+// schema; subsequent calls return the cached value.
+func (g *GroupByAgg) Schema() *vectorized.BatchSchema {
+       if g.outputCache != nil {
+               return g.outputCache
+       }
+       if g.Child == nil {
+               return nil
+       }
+       inputSchema := g.Child.Schema()
+       if inputSchema == nil {
+               return nil
+       }
+       // Synthesize a transient MeasureQueryOptions to drive the planner.
+       opts := model.MeasureQueryOptions{GroupBy: g.GroupBy, Agg: g.Agg}
+       // A throwaway tracker; we only need the resulting operator's
+       // OutputSchema, not its bookkeeping.
+       tracker := vectorized.NewMemoryTracker(1 << 30)
+       ops, err := vmeasure.BuildOperators(opts, inputSchema, tracker, 1024)
+       if err != nil || len(ops) != 1 {
+               return nil
+       }
+       g.outputCache = ops[0].OutputSchema()
+       return g.outputCache
+}
+
+// Children returns the single child.
+func (g *GroupByAgg) Children() []VecPlan { return []VecPlan{g.Child} }
+
+// Build recurses into child, then constructs the BatchAggregation via
+// BuildOperators and attaches it as a breaker. The pipeline-shared
+// MemoryTracker from bc threads through.
+func (g *GroupByAgg) Build(ctx context.Context, bc *BuildContext) error {
+       if buildErr := g.Child.Build(ctx, bc); buildErr != nil {
+               return buildErr
+       }
+       inputSchema := g.Child.Schema()
+       opts := model.MeasureQueryOptions{GroupBy: g.GroupBy, Agg: g.Agg}
+       ops, opsErr := vmeasure.BuildOperators(opts, inputSchema, bc.Tracker, 
bc.Config.BatchSize)
+       if opsErr != nil {
+               return fmt.Errorf("plan.GroupByAgg.Build: %w", opsErr)
+       }
+       if len(ops) != 1 {
+               return fmt.Errorf("plan.GroupByAgg.Build: expected 1 operator, 
got %d", len(ops))
+       }
+       bc.Builder.Break(ops[0])
+       // Cache for subsequent Schema() queries — saves re-running the planner.
+       g.outputCache = ops[0].OutputSchema()
+       return nil
+}
+
+// String returns a single-line debug description.
+func (g *GroupByAgg) String() string {
+       tagNames := ""
+       if g.GroupBy != nil {
+               tagNames = strings.Join(g.GroupBy.TagNames, ",")
+       }
+       field := ""
+       if g.Agg != nil {
+               field = g.Agg.FieldName
+       }
+       return fmt.Sprintf("GroupByAgg(keys=%s, fn=%v, field=%s)", tagNames, 
g.Agg.Func, field)
+}
diff --git a/pkg/query/vectorized/measure/plan/limit.go 
b/pkg/query/vectorized/measure/plan/limit.go
new file mode 100644
index 000000000..2c3d1f5f9
--- /dev/null
+++ b/pkg/query/vectorized/measure/plan/limit.go
@@ -0,0 +1,76 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plan
+
+import (
+       "context"
+       "fmt"
+
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+       measure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
+)
+
+// Limit applies offset+limit windowing as a fusible operator on the
+// pipeline. Schema-preserving: emits the same column layout as its child.
+//
+// Limit <= 0 means "no limit"; the analyzer normalises QueryRequest.Limit
+// of zero to the row-path default (100) before constructing this node.
+type Limit struct {
+       Child  VecPlan
+       Offset uint32
+       N      uint32
+}
+
+// NewLimit constructs a Limit node wrapping child.
+func NewLimit(child VecPlan, offset, n uint32) *Limit {
+       return &Limit{Child: child, Offset: offset, N: n}
+}
+
+// Schema returns the child's schema (Limit is schema-preserving).
+func (l *Limit) Schema() *vectorized.BatchSchema {
+       if l.Child == nil {
+               return nil
+       }
+       return l.Child.Schema()
+}
+
+// Children returns the single child.
+func (l *Limit) Children() []VecPlan { return []VecPlan{l.Child} }
+
+// Build recurses into the child first, then attaches a BatchLimit as a
+// fusible operator. If N is zero, no operator is attached — the caller's
+// downstream nodes still see the source's full output.
+func (l *Limit) Build(ctx context.Context, bc *BuildContext) error {
+       if l.Child == nil {
+               return fmt.Errorf("plan.Limit.Build: Child is nil")
+       }
+       if buildErr := l.Child.Build(ctx, bc); buildErr != nil {
+               return buildErr
+       }
+       if l.N == 0 {
+               return nil
+       }
+       op := measure.NewBatchLimit(l.Schema(), l.Offset, l.N)
+       bc.Builder.Apply(op)
+       return nil
+}
+
+// String returns a single-line debug description.
+func (l *Limit) String() string {
+       return fmt.Sprintf("Limit(offset=%d, n=%d)", l.Offset, l.N)
+}
diff --git a/pkg/query/vectorized/measure/plan/plan.go 
b/pkg/query/vectorized/measure/plan/plan.go
new file mode 100644
index 000000000..e061802c7
--- /dev/null
+++ b/pkg/query/vectorized/measure/plan/plan.go
@@ -0,0 +1,100 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package plan is the vectorized measure-query plan tree (G8).
+//
+// This package is a peer of pkg/query/logical/measure (deprecated, row
+// path); the two share no plan-node types, executor wiring, or iterator
+// machinery. Top-level dispatch (banyand/query/processor.go, G8d) routes
+// requests to one OR the other based on VectorizedConfig.Enabled.
+//
+// A VecPlan node knows its output BatchSchema, its children, and how to
+// append itself to a *vectorized.PipelineBuilder during Build. Build is
+// bottom-up: a node calls Build on its child first, then attaches its own
+// operator. The root's Build returns a fully-composed PipelineBuilder
+// that the executor (G8c) finalizes via builder.Build() to produce a
+// *vectorized.Pipeline.
+package plan
+
+import (
+       "context"
+
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+       measure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
+)
+
+// VecPlan is the vectorized measure-query plan node interface.
+//
+// Schema returns the BatchSchema of rows this node emits — for fusible
+// or schema-preserving breakers it matches the input schema; for
+// schema-rewriting breakers (BatchAggregation) it is the operator's
+// OutputSchema.
+//
+// Children returns the immediate child plan nodes (zero for leaves).
+//
+// Build appends this node's contribution to bc.Builder. The leaf (Scan)
+// calls Builder.From; fusible nodes call Builder.Apply; breakers call
+// Builder.Break. Build must call Build on its children before attaching
+// itself so the source flows in tree order.
+//
+// String returns a single-line debug description ("Scan(measure=foo)",
+// "GroupByAgg(keys=svc, fn=sum, field=value)", etc.) so plan trees can be
+// pretty-printed with PrintTree.
+type VecPlan interface {
+       Schema() *vectorized.BatchSchema
+       Children() []VecPlan
+       Build(ctx context.Context, bc *BuildContext) error
+       String() string
+}
+
+// BuildContext is the cross-cutting state threaded through every Build
+// call. Builder accumulates operators; Tracker is the shared per-pipeline
+// MemoryTracker (G7a) every memory-bookkeeping operator must use; Config
+// supplies BatchSize and other runtime knobs.
+type BuildContext struct {
+       Builder *vectorized.PipelineBuilder
+       Tracker *vectorized.MemoryTracker
+       Config  measure.VectorizedConfig
+}
+
+// PrintTree renders a plan tree as multi-line text. Leaves first column,
+// each parent indented two spaces deeper than its child. Useful for
+// debugging analyzer output.
+func PrintTree(root VecPlan) string {
+       var sb stringBuilder
+       printNode(&sb, root, 0)
+       return sb.String()
+}
+
+func printNode(sb *stringBuilder, node VecPlan, depth int) {
+       for range depth {
+               sb.WriteString("  ")
+       }
+       sb.WriteString(node.String())
+       sb.WriteString("\n")
+       for _, child := range node.Children() {
+               printNode(sb, child, depth+1)
+       }
+}
+
+// stringBuilder is a tiny shim around strings.Builder to keep imports
+// out of the package-level scope. Inline so test-only helpers don't pull
+// strings into production builds.
+type stringBuilder struct{ buf []byte }
+
+func (s *stringBuilder) WriteString(str string) { s.buf = append(s.buf, 
str...) }
+func (s *stringBuilder) String() string         { return string(s.buf) }
diff --git a/pkg/query/vectorized/measure/plan/scan.go 
b/pkg/query/vectorized/measure/plan/scan.go
new file mode 100644
index 000000000..f54f28726
--- /dev/null
+++ b/pkg/query/vectorized/measure/plan/scan.go
@@ -0,0 +1,87 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plan
+
+import (
+       "context"
+       "fmt"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// ScanParams holds everything the executor needs to materialize a batch
+// source from a Measure. The analyzer populates these from the proto
+// QueryRequest at plan-build time; the executor (G8c) consults the
+// MeasureExecutionContext and constructs the MeasureBatchResult right
+// before calling Scan.Build.
+type ScanParams struct {
+       Measure         *databasev1.Measure
+       TimeRange       *timestamp.TimeRange
+       Query           index.Query
+       Entities        [][]*modelv1.TagValue
+       TagProjection   []model.TagProjection
+       FieldProjection []string
+}
+
+// Scan is the leaf node of every vec plan. It carries the schema for
+// downstream nodes to consult and the parameters the executor needs to
+// build a source. Source is set by the executor immediately before
+// Build is called; tests can populate it directly to drive Build with a
+// fake source.
+type Scan struct {
+       BatchSchema *vectorized.BatchSchema
+       Source      vectorized.PullOperator
+       Params      ScanParams
+}
+
+// NewScan constructs a Scan node with an analyzed schema and params.
+// Source is unset; the executor or test populates it before Build.
+func NewScan(schema *vectorized.BatchSchema, params ScanParams) *Scan {
+       return &Scan{BatchSchema: schema, Params: params}
+}
+
+// Schema returns the BatchSchema of rows this node emits.
+func (s *Scan) Schema() *vectorized.BatchSchema { return s.BatchSchema }
+
+// Children returns no children — Scan is the leaf.
+func (s *Scan) Children() []VecPlan { return nil }
+
+// Build attaches Source as the pipeline source. The executor must set
+// Source before invoking Build; an unset Source is treated as a
+// programming error (the executor missed a step).
+func (s *Scan) Build(_ context.Context, bc *BuildContext) error {
+       if s.Source == nil {
+               return fmt.Errorf("plan.Scan.Build: Source not set; the 
executor must populate it before Build")
+       }
+       bc.Builder.From(s.Source)
+       return nil
+}
+
+// String returns a single-line debug description.
+func (s *Scan) String() string {
+       name := ""
+       if s.Params.Measure != nil {
+               name = s.Params.Measure.GetMetadata().GetName()
+       }
+       return fmt.Sprintf("Scan(measure=%s)", name)
+}
diff --git a/pkg/query/vectorized/measure/plan/top.go 
b/pkg/query/vectorized/measure/plan/top.go
new file mode 100644
index 000000000..65dc62628
--- /dev/null
+++ b/pkg/query/vectorized/measure/plan/top.go
@@ -0,0 +1,90 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plan
+
+import (
+       "context"
+       "fmt"
+
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+       measure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
+)
+
+// Top selects the top-N (or bottom-N when Asc) rows by FieldName. Wraps
+// `measure.BatchTop`, which uses a single global heap — the row path's
+// per-timestamp TopN semantic is not yet reproduced here (BatchTop
+// extension is tracked separately; see G6 plan).
+//
+// Schema-preserving.
+type Top struct {
+       Child     VecPlan
+       FieldName string
+       N         int
+       Asc       bool
+}
+
+// NewTop constructs a Top node wrapping child.
+func NewTop(child VecPlan, fieldName string, n int, asc bool) *Top {
+       return &Top{Child: child, FieldName: fieldName, N: n, Asc: asc}
+}
+
+// Schema returns the child's schema (Top is schema-preserving).
+func (t *Top) Schema() *vectorized.BatchSchema {
+       if t.Child == nil {
+               return nil
+       }
+       return t.Child.Schema()
+}
+
+// Children returns the single child.
+func (t *Top) Children() []VecPlan { return []VecPlan{t.Child} }
+
+// Build recurses into the child, locates FieldName in the propagated
+// schema, then attaches a BatchTop as a breaker. The FieldName must
+// reference a field column in the current schema.
+func (t *Top) Build(ctx context.Context, bc *BuildContext) error {
+       if t.Child == nil {
+               return fmt.Errorf("plan.Top.Build: Child is nil")
+       }
+       if buildErr := t.Child.Build(ctx, bc); buildErr != nil {
+               return buildErr
+       }
+       schema := t.Schema()
+       fieldIdx := -1
+       for i, def := range schema.Columns {
+               if def.Role == vectorized.RoleField && def.Name == t.FieldName {
+                       fieldIdx = i
+                       break
+               }
+       }
+       if fieldIdx < 0 {
+               return fmt.Errorf("plan.Top.Build: field %q not present in 
schema", t.FieldName)
+       }
+       op := measure.NewBatchTop(schema, fieldIdx, t.N, t.Asc, 
bc.Config.BatchSize)
+       bc.Builder.Break(op)
+       return nil
+}
+
+// String returns a single-line debug description.
+func (t *Top) String() string {
+       dir := "desc"
+       if t.Asc {
+               dir = "asc"
+       }
+       return fmt.Sprintf("Top(field=%s, n=%d, %s)", t.FieldName, t.N, dir)
+}

Reply via email to