This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit bd9f9a08cadeae8342652ce6cafc1b0d2a2fb42a Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 13 09:56:44 2026 +0000 feat(query/vectorized/measure/plan): VecPlan tree scaffolding (G8a) New package pkg/query/vectorized/measure/plan defines the vectorized measure-query plan tree — a peer of pkg/query/logical/measure (deprecated, row path) that shares no plan node types, executor, or iterator machinery. Per the G8 directive, top-level dispatch routes to one OR the other based on VectorizedConfig.Enabled; no leaf substitution into the row plan. This commit lands the scaffolding: - VecPlan interface: Schema, Children, Build(ctx, *BuildContext), String. - BuildContext: threads the shared PipelineBuilder, MemoryTracker (G7a), and VectorizedConfig through every node's Build call. - Scan: leaf node; carries the analyzed schema and the executor-supplied PullOperator source. Build attaches the source via Builder.From. - Limit: fusible node wrapping BatchLimit; schema-preserving. - Top: breaker node wrapping BatchTop; schema-preserving. Reuses the existing single-heap BatchTop semantics — per-timestamp partitioning is tracked separately. - GroupByAgg: breaker node that delegates operator construction to G7d's BuildOperators, which today emits a single BatchAggregation that does both grouping and folding. Schema-rewriting: drops timestamp and adds the agg result field (the auto-derived `<field>_<func>` column name). - PrintTree: helper to render a tree as multi-line indented text for debugging. Tests (build_test.go) drive each node against a fake PullOperator, verifying: - Scan + Limit emits source batches unchanged when N exceeds the row count. - Scan + GroupByAgg + Limit aggregates by key correctly (svc tag, SUM value). - Scan with unset Source surfaces a programmer-error message. - GroupByAgg.Schema drops timestamp and adds the agg result column with the expected output name. --- pkg/query/vectorized/measure/plan/build_test.go | 231 +++++++++++++++++++++++ pkg/query/vectorized/measure/plan/groupby_agg.go | 124 ++++++++++++ pkg/query/vectorized/measure/plan/limit.go | 76 ++++++++ pkg/query/vectorized/measure/plan/plan.go | 100 ++++++++++ pkg/query/vectorized/measure/plan/scan.go | 87 +++++++++ pkg/query/vectorized/measure/plan/top.go | 90 +++++++++ 6 files changed, 708 insertions(+) diff --git a/pkg/query/vectorized/measure/plan/build_test.go b/pkg/query/vectorized/measure/plan/build_test.go new file mode 100644 index 000000000..e04a33d82 --- /dev/null +++ b/pkg/query/vectorized/measure/plan/build_test.go @@ -0,0 +1,231 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package plan + +import ( + "context" + "testing" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" + measure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" +) + +// fakePullSource emits a fixed sequence of RecordBatches then EOF. +type fakePullSource struct { + schema *vectorized.BatchSchema + batches []*vectorized.RecordBatch + idx int +} + +func (f *fakePullSource) Init(_ context.Context) error { return nil } +func (f *fakePullSource) OutputSchema() *vectorized.BatchSchema { return f.schema } +func (f *fakePullSource) Close() error { return nil } +func (f *fakePullSource) NextBatch(_ context.Context) (*vectorized.RecordBatch, error) { + if f.idx >= len(f.batches) { + return nil, nil + } + b := f.batches[f.idx] + f.idx++ + return b, nil +} + +// buildScanInput constructs a BatchSchema + RecordBatch shaped like a Scan +// output: (timestamp, version, sid, shardID) + (svc tag) + (value field). +// The aggregation pipeline projects svc + value out of this layout. +func buildScanInput(t *testing.T) (*vectorized.BatchSchema, *vectorized.RecordBatch) { + t.Helper() + schema := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTimestamp, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleVersion, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleSeriesID, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleShardID, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "value", Type: vectorized.ColumnTypeInt64}, + }) + b := vectorized.NewRecordBatch(schema, 5) + pushRow := func(ts int64, svc string, v int64) { + b.Columns[0].(*vectorized.TypedColumn[int64]).Append(ts) + b.Columns[1].(*vectorized.TypedColumn[int64]).Append(1) + b.Columns[2].(*vectorized.TypedColumn[int64]).Append(1) + b.Columns[3].(*vectorized.TypedColumn[int64]).Append(0) + b.Columns[4].(*vectorized.TypedColumn[string]).Append(svc) + b.Columns[5].(*vectorized.TypedColumn[int64]).Append(v) + } + pushRow(1, "a", 1) + pushRow(2, "b", 4) + pushRow(3, "a", 2) + pushRow(4, "a", 3) + pushRow(5, "b", 5) + b.Len = 5 + return schema, b +} + +// drainPipeline pulls every batch from the pipeline and returns the slice +// of returned batches (caller must not modify them after this call). +func drainPipeline(t *testing.T, p *vectorized.Pipeline) []*vectorized.RecordBatch { + t.Helper() + var out []*vectorized.RecordBatch + for { + b, err := p.Next(context.Background()) + if err != nil { + t.Fatalf("pipeline Next: %v", err) + } + if b == nil { + break + } + out = append(out, b) + } + return out +} + +func TestScanLimit_Build_EmitsSourceBatchesUnchanged(t *testing.T) { + schema, batch := buildScanInput(t) + src := &fakePullSource{schema: schema, batches: []*vectorized.RecordBatch{batch}} + + scan := NewScan(schema, ScanParams{}) + scan.Source = src + root := NewLimit(scan, 0, 10) // limit > total → no rows dropped + + tracker := vectorized.NewMemoryTracker(1 << 20) + bc := &BuildContext{ + Builder: vectorized.NewPipelineBuilder().WithMemoryTracker(tracker), + Tracker: tracker, + Config: measure.VectorizedConfig{BatchSize: 1024, QueryMemoryMiB: 1}, + } + if buildErr := root.Build(context.Background(), bc); buildErr != nil { + t.Fatalf("Build: %v", buildErr) + } + pipeline, err := bc.Builder.Build() + if err != nil { + t.Fatalf("PipelineBuilder.Build: %v", err) + } + if initErr := pipeline.Init(context.Background()); initErr != nil { + t.Fatal(initErr) + } + batches := drainPipeline(t, pipeline) + if len(batches) != 1 { + t.Fatalf("want 1 batch, got %d", len(batches)) + } + if batches[0].Len != 5 { + t.Fatalf("want 5 rows, got %d", batches[0].Len) + } +} + +func TestScanGroupByAggLimit_Build_AggregatesByKey(t *testing.T) { + schema, batch := buildScanInput(t) + src := &fakePullSource{schema: schema, batches: []*vectorized.RecordBatch{batch}} + + scan := NewScan(schema, ScanParams{}) + scan.Source = src + gba, err := NewGroupByAgg(scan, + &model.MeasureGroupBy{TagFamily: "default", TagNames: []string{"svc"}}, + &model.MeasureAgg{FieldName: "value", Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, + ) + if err != nil { + t.Fatalf("NewGroupByAgg: %v", err) + } + root := NewLimit(gba, 0, 10) + + tracker := vectorized.NewMemoryTracker(1 << 20) + bc := &BuildContext{ + Builder: vectorized.NewPipelineBuilder().WithMemoryTracker(tracker), + Tracker: tracker, + Config: measure.VectorizedConfig{BatchSize: 1024, QueryMemoryMiB: 1}, + } + if buildErr := root.Build(context.Background(), bc); buildErr != nil { + t.Fatalf("Build: %v", buildErr) + } + pipeline, buildErr := bc.Builder.Build() + if buildErr != nil { + t.Fatalf("PipelineBuilder.Build: %v", buildErr) + } + if initErr := pipeline.Init(context.Background()); initErr != nil { + t.Fatal(initErr) + } + batches := drainPipeline(t, pipeline) + totalRows := 0 + for _, b := range batches { + totalRows += b.Len + } + if totalRows != 2 { + t.Fatalf("want 2 aggregated rows (2 groups), got %d", totalRows) + } + + // Verify by-key sums: a=6, b=9. + bySvc := map[string]int64{} + for _, b := range batches { + svcCol := b.Columns[0].(*vectorized.TypedColumn[string]) + sumCol := b.Columns[1].(*vectorized.TypedColumn[int64]) + for i := 0; i < b.Len; i++ { + bySvc[svcCol.Data()[i]] = sumCol.Data()[i] + } + } + if bySvc["a"] != 6 { + t.Fatalf("sum(a): want 6, got %d", bySvc["a"]) + } + if bySvc["b"] != 9 { + t.Fatalf("sum(b): want 9, got %d", bySvc["b"]) + } +} + +func TestScan_Build_MissingSource_Errors(t *testing.T) { + schema, _ := buildScanInput(t) + scan := NewScan(schema, ScanParams{}) // Source intentionally unset + tracker := vectorized.NewMemoryTracker(1 << 20) + bc := &BuildContext{ + Builder: vectorized.NewPipelineBuilder().WithMemoryTracker(tracker), + Tracker: tracker, + Config: measure.VectorizedConfig{BatchSize: 1024, QueryMemoryMiB: 1}, + } + if err := scan.Build(context.Background(), bc); err == nil { + t.Fatal("Build with nil Source must error") + } +} + +func TestGroupByAgg_Schema_DropsTimestampAddsAggField(t *testing.T) { + schema, _ := buildScanInput(t) + src := &fakePullSource{schema: schema} + scan := NewScan(schema, ScanParams{}) + scan.Source = src + gba, err := NewGroupByAgg(scan, + &model.MeasureGroupBy{TagFamily: "default", TagNames: []string{"svc"}}, + &model.MeasureAgg{FieldName: "value", Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, + ) + if err != nil { + t.Fatalf("NewGroupByAgg: %v", err) + } + out := gba.Schema() + if out == nil { + t.Fatal("GroupByAgg.Schema must not be nil after BuildOperators resolves it") + } + if out.TimestampIndex() >= 0 { + t.Fatalf("aggregation output must drop timestamp (D2); got index %d", out.TimestampIndex()) + } + // 2 columns: svc key + value_sum agg result. + if len(out.Columns) != 2 { + t.Fatalf("want 2 output columns (key + agg result), got %d", len(out.Columns)) + } + if out.Columns[0].Name != "svc" { + t.Fatalf("col 0 should be the svc key, got %s", out.Columns[0].Name) + } + if out.Columns[1].Name != "value_sum" { + t.Fatalf("col 1 should be value_sum, got %s", out.Columns[1].Name) + } +} diff --git a/pkg/query/vectorized/measure/plan/groupby_agg.go b/pkg/query/vectorized/measure/plan/groupby_agg.go new file mode 100644 index 000000000..df678440b --- /dev/null +++ b/pkg/query/vectorized/measure/plan/groupby_agg.go @@ -0,0 +1,124 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package plan + +import ( + "context" + "fmt" + "strings" + + "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" + vmeasure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" +) + +// GroupByAgg is the v1 vec aggregation node: GroupBy + single Aggregation +// fused into one BatchAggregation operator (see G7d planner). The +// operator does its own grouping via keyIndices and folding via aggs. +// +// Schema-rewriting: output is key columns + one agg result column. The +// timestamp column is dropped, so serializeBatchToProto emits +// DataPoint.Timestamp == nil for every output row (per G7 design +// decision D2). +type GroupByAgg struct { + Child VecPlan + GroupBy *model.MeasureGroupBy + Agg *model.MeasureAgg + outputCache *vectorized.BatchSchema +} + +// NewGroupByAgg constructs a GroupByAgg node wrapping child. Returns an +// error if GroupBy or Agg are nil (v1 requires both: scalar reduce and +// raw groupby are not supported). +func NewGroupByAgg(child VecPlan, groupBy *model.MeasureGroupBy, agg *model.MeasureAgg) (*GroupByAgg, error) { + if groupBy == nil { + return nil, fmt.Errorf("plan.GroupByAgg: GroupBy must not be nil (scalar reduce not supported in v1)") + } + if agg == nil { + return nil, fmt.Errorf("plan.GroupByAgg: Agg must not be nil (raw groupby not supported in v1)") + } + if child == nil { + return nil, fmt.Errorf("plan.GroupByAgg: Child must not be nil") + } + return &GroupByAgg{Child: child, GroupBy: groupBy, Agg: agg}, nil +} + +// Schema returns the aggregation output schema. The schema is computed +// lazily on first call by running BuildOperators against the child +// schema; subsequent calls return the cached value. +func (g *GroupByAgg) Schema() *vectorized.BatchSchema { + if g.outputCache != nil { + return g.outputCache + } + if g.Child == nil { + return nil + } + inputSchema := g.Child.Schema() + if inputSchema == nil { + return nil + } + // Synthesize a transient MeasureQueryOptions to drive the planner. + opts := model.MeasureQueryOptions{GroupBy: g.GroupBy, Agg: g.Agg} + // A throwaway tracker; we only need the resulting operator's + // OutputSchema, not its bookkeeping. + tracker := vectorized.NewMemoryTracker(1 << 30) + ops, err := vmeasure.BuildOperators(opts, inputSchema, tracker, 1024) + if err != nil || len(ops) != 1 { + return nil + } + g.outputCache = ops[0].OutputSchema() + return g.outputCache +} + +// Children returns the single child. +func (g *GroupByAgg) Children() []VecPlan { return []VecPlan{g.Child} } + +// Build recurses into child, then constructs the BatchAggregation via +// BuildOperators and attaches it as a breaker. The pipeline-shared +// MemoryTracker from bc threads through. +func (g *GroupByAgg) Build(ctx context.Context, bc *BuildContext) error { + if buildErr := g.Child.Build(ctx, bc); buildErr != nil { + return buildErr + } + inputSchema := g.Child.Schema() + opts := model.MeasureQueryOptions{GroupBy: g.GroupBy, Agg: g.Agg} + ops, opsErr := vmeasure.BuildOperators(opts, inputSchema, bc.Tracker, bc.Config.BatchSize) + if opsErr != nil { + return fmt.Errorf("plan.GroupByAgg.Build: %w", opsErr) + } + if len(ops) != 1 { + return fmt.Errorf("plan.GroupByAgg.Build: expected 1 operator, got %d", len(ops)) + } + bc.Builder.Break(ops[0]) + // Cache for subsequent Schema() queries — saves re-running the planner. + g.outputCache = ops[0].OutputSchema() + return nil +} + +// String returns a single-line debug description. +func (g *GroupByAgg) String() string { + tagNames := "" + if g.GroupBy != nil { + tagNames = strings.Join(g.GroupBy.TagNames, ",") + } + field := "" + if g.Agg != nil { + field = g.Agg.FieldName + } + return fmt.Sprintf("GroupByAgg(keys=%s, fn=%v, field=%s)", tagNames, g.Agg.Func, field) +} diff --git a/pkg/query/vectorized/measure/plan/limit.go b/pkg/query/vectorized/measure/plan/limit.go new file mode 100644 index 000000000..2c3d1f5f9 --- /dev/null +++ b/pkg/query/vectorized/measure/plan/limit.go @@ -0,0 +1,76 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package plan + +import ( + "context" + "fmt" + + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" + measure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" +) + +// Limit applies offset+limit windowing as a fusible operator on the +// pipeline. Schema-preserving: emits the same column layout as its child. +// +// Limit <= 0 means "no limit"; the analyzer normalises QueryRequest.Limit +// of zero to the row-path default (100) before constructing this node. +type Limit struct { + Child VecPlan + Offset uint32 + N uint32 +} + +// NewLimit constructs a Limit node wrapping child. +func NewLimit(child VecPlan, offset, n uint32) *Limit { + return &Limit{Child: child, Offset: offset, N: n} +} + +// Schema returns the child's schema (Limit is schema-preserving). +func (l *Limit) Schema() *vectorized.BatchSchema { + if l.Child == nil { + return nil + } + return l.Child.Schema() +} + +// Children returns the single child. +func (l *Limit) Children() []VecPlan { return []VecPlan{l.Child} } + +// Build recurses into the child first, then attaches a BatchLimit as a +// fusible operator. If N is zero, no operator is attached — the caller's +// downstream nodes still see the source's full output. +func (l *Limit) Build(ctx context.Context, bc *BuildContext) error { + if l.Child == nil { + return fmt.Errorf("plan.Limit.Build: Child is nil") + } + if buildErr := l.Child.Build(ctx, bc); buildErr != nil { + return buildErr + } + if l.N == 0 { + return nil + } + op := measure.NewBatchLimit(l.Schema(), l.Offset, l.N) + bc.Builder.Apply(op) + return nil +} + +// String returns a single-line debug description. +func (l *Limit) String() string { + return fmt.Sprintf("Limit(offset=%d, n=%d)", l.Offset, l.N) +} diff --git a/pkg/query/vectorized/measure/plan/plan.go b/pkg/query/vectorized/measure/plan/plan.go new file mode 100644 index 000000000..e061802c7 --- /dev/null +++ b/pkg/query/vectorized/measure/plan/plan.go @@ -0,0 +1,100 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package plan is the vectorized measure-query plan tree (G8). +// +// This package is a peer of pkg/query/logical/measure (deprecated, row +// path); the two share no plan-node types, executor wiring, or iterator +// machinery. Top-level dispatch (banyand/query/processor.go, G8d) routes +// requests to one OR the other based on VectorizedConfig.Enabled. +// +// A VecPlan node knows its output BatchSchema, its children, and how to +// append itself to a *vectorized.PipelineBuilder during Build. Build is +// bottom-up: a node calls Build on its child first, then attaches its own +// operator. The root's Build returns a fully-composed PipelineBuilder +// that the executor (G8c) finalizes via builder.Build() to produce a +// *vectorized.Pipeline. +package plan + +import ( + "context" + + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" + measure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" +) + +// VecPlan is the vectorized measure-query plan node interface. +// +// Schema returns the BatchSchema of rows this node emits — for fusible +// or schema-preserving breakers it matches the input schema; for +// schema-rewriting breakers (BatchAggregation) it is the operator's +// OutputSchema. +// +// Children returns the immediate child plan nodes (zero for leaves). +// +// Build appends this node's contribution to bc.Builder. The leaf (Scan) +// calls Builder.From; fusible nodes call Builder.Apply; breakers call +// Builder.Break. Build must call Build on its children before attaching +// itself so the source flows in tree order. +// +// String returns a single-line debug description ("Scan(measure=foo)", +// "GroupByAgg(keys=svc, fn=sum, field=value)", etc.) so plan trees can be +// pretty-printed with PrintTree. +type VecPlan interface { + Schema() *vectorized.BatchSchema + Children() []VecPlan + Build(ctx context.Context, bc *BuildContext) error + String() string +} + +// BuildContext is the cross-cutting state threaded through every Build +// call. Builder accumulates operators; Tracker is the shared per-pipeline +// MemoryTracker (G7a) every memory-bookkeeping operator must use; Config +// supplies BatchSize and other runtime knobs. +type BuildContext struct { + Builder *vectorized.PipelineBuilder + Tracker *vectorized.MemoryTracker + Config measure.VectorizedConfig +} + +// PrintTree renders a plan tree as multi-line text. Leaves first column, +// each parent indented two spaces deeper than its child. Useful for +// debugging analyzer output. +func PrintTree(root VecPlan) string { + var sb stringBuilder + printNode(&sb, root, 0) + return sb.String() +} + +func printNode(sb *stringBuilder, node VecPlan, depth int) { + for range depth { + sb.WriteString(" ") + } + sb.WriteString(node.String()) + sb.WriteString("\n") + for _, child := range node.Children() { + printNode(sb, child, depth+1) + } +} + +// stringBuilder is a tiny shim around strings.Builder to keep imports +// out of the package-level scope. Inline so test-only helpers don't pull +// strings into production builds. +type stringBuilder struct{ buf []byte } + +func (s *stringBuilder) WriteString(str string) { s.buf = append(s.buf, str...) } +func (s *stringBuilder) String() string { return string(s.buf) } diff --git a/pkg/query/vectorized/measure/plan/scan.go b/pkg/query/vectorized/measure/plan/scan.go new file mode 100644 index 000000000..f54f28726 --- /dev/null +++ b/pkg/query/vectorized/measure/plan/scan.go @@ -0,0 +1,87 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package plan + +import ( + "context" + "fmt" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +// ScanParams holds everything the executor needs to materialize a batch +// source from a Measure. The analyzer populates these from the proto +// QueryRequest at plan-build time; the executor (G8c) consults the +// MeasureExecutionContext and constructs the MeasureBatchResult right +// before calling Scan.Build. +type ScanParams struct { + Measure *databasev1.Measure + TimeRange *timestamp.TimeRange + Query index.Query + Entities [][]*modelv1.TagValue + TagProjection []model.TagProjection + FieldProjection []string +} + +// Scan is the leaf node of every vec plan. It carries the schema for +// downstream nodes to consult and the parameters the executor needs to +// build a source. Source is set by the executor immediately before +// Build is called; tests can populate it directly to drive Build with a +// fake source. +type Scan struct { + BatchSchema *vectorized.BatchSchema + Source vectorized.PullOperator + Params ScanParams +} + +// NewScan constructs a Scan node with an analyzed schema and params. +// Source is unset; the executor or test populates it before Build. +func NewScan(schema *vectorized.BatchSchema, params ScanParams) *Scan { + return &Scan{BatchSchema: schema, Params: params} +} + +// Schema returns the BatchSchema of rows this node emits. +func (s *Scan) Schema() *vectorized.BatchSchema { return s.BatchSchema } + +// Children returns no children — Scan is the leaf. +func (s *Scan) Children() []VecPlan { return nil } + +// Build attaches Source as the pipeline source. The executor must set +// Source before invoking Build; an unset Source is treated as a +// programming error (the executor missed a step). +func (s *Scan) Build(_ context.Context, bc *BuildContext) error { + if s.Source == nil { + return fmt.Errorf("plan.Scan.Build: Source not set; the executor must populate it before Build") + } + bc.Builder.From(s.Source) + return nil +} + +// String returns a single-line debug description. +func (s *Scan) String() string { + name := "" + if s.Params.Measure != nil { + name = s.Params.Measure.GetMetadata().GetName() + } + return fmt.Sprintf("Scan(measure=%s)", name) +} diff --git a/pkg/query/vectorized/measure/plan/top.go b/pkg/query/vectorized/measure/plan/top.go new file mode 100644 index 000000000..65dc62628 --- /dev/null +++ b/pkg/query/vectorized/measure/plan/top.go @@ -0,0 +1,90 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package plan + +import ( + "context" + "fmt" + + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" + measure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" +) + +// Top selects the top-N (or bottom-N when Asc) rows by FieldName. Wraps +// `measure.BatchTop`, which uses a single global heap — the row path's +// per-timestamp TopN semantic is not yet reproduced here (BatchTop +// extension is tracked separately; see G6 plan). +// +// Schema-preserving. +type Top struct { + Child VecPlan + FieldName string + N int + Asc bool +} + +// NewTop constructs a Top node wrapping child. +func NewTop(child VecPlan, fieldName string, n int, asc bool) *Top { + return &Top{Child: child, FieldName: fieldName, N: n, Asc: asc} +} + +// Schema returns the child's schema (Top is schema-preserving). +func (t *Top) Schema() *vectorized.BatchSchema { + if t.Child == nil { + return nil + } + return t.Child.Schema() +} + +// Children returns the single child. +func (t *Top) Children() []VecPlan { return []VecPlan{t.Child} } + +// Build recurses into the child, locates FieldName in the propagated +// schema, then attaches a BatchTop as a breaker. The FieldName must +// reference a field column in the current schema. +func (t *Top) Build(ctx context.Context, bc *BuildContext) error { + if t.Child == nil { + return fmt.Errorf("plan.Top.Build: Child is nil") + } + if buildErr := t.Child.Build(ctx, bc); buildErr != nil { + return buildErr + } + schema := t.Schema() + fieldIdx := -1 + for i, def := range schema.Columns { + if def.Role == vectorized.RoleField && def.Name == t.FieldName { + fieldIdx = i + break + } + } + if fieldIdx < 0 { + return fmt.Errorf("plan.Top.Build: field %q not present in schema", t.FieldName) + } + op := measure.NewBatchTop(schema, fieldIdx, t.N, t.Asc, bc.Config.BatchSize) + bc.Builder.Break(op) + return nil +} + +// String returns a single-line debug description. +func (t *Top) String() string { + dir := "desc" + if t.Asc { + dir = "asc" + } + return fmt.Sprintf("Top(field=%s, n=%d, %s)", t.FieldName, t.N, dir) +}
